test_services.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007 import random
00008 
00009 from rosbridge_library.internal import services, ros_loader
00010 from rosbridge_library.internal import message_conversion as c
00011 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00012 
00013 from roscpp.srv import GetLoggers
00014 
00015 
00016 def populate_random_args(d):
00017     # Given a dictionary d, replaces primitives with random values
00018     if isinstance(d, dict):
00019         for x in d:
00020             d[x] = populate_random_args(d[x])
00021         return d
00022     elif isinstance(d, str):
00023         return str(random.random())
00024     elif isinstance(d, unicode):
00025         return unicode(random.random())
00026     elif isinstance(d, bool):
00027         return True
00028     elif isinstance(d, int):
00029         return random.randint(100, 200)
00030     elif isinstance(d, float):
00031         return 3.5
00032     else:
00033         return d
00034 
00035 
00036 class ServiceTester:
00037 
00038     def __init__(self, name, srv_type):
00039         self.name = name
00040         self.srvClass = ros_loader.get_service_class(srv_type)
00041         self.service = rospy.Service(name, self.srvClass, self.callback)
00042 
00043     def start(self):
00044         req = self.srvClass._request_class()
00045         gen = c.extract_values(req)
00046         gen = populate_random_args(gen)
00047         self.input = gen
00048         services.ServiceCaller(self.name, gen, self.success, self.error).start()
00049 
00050     def callback(self, req):
00051         self.req = req
00052         time.sleep(0.5)
00053         rsp = self.srvClass._response_class()
00054         gen = c.extract_values(rsp)
00055         gen = populate_random_args(gen)
00056         try:
00057             rsp = c.populate_instance(gen, rsp)
00058         except:
00059             print "populating instance"
00060             print rsp
00061             print "populating with"
00062             print gen
00063             raise
00064         self.output = gen
00065         return rsp
00066 
00067     def success(self, rsp):
00068         self.rsp = rsp
00069 
00070     def error(self, exc):
00071         self.exc = exc
00072 
00073     def validate(self, equality_function):
00074         if hasattr(self, "exc"):
00075             print self.exc
00076             print self.exc.message
00077             raise self.exc
00078         equality_function(self.input, c.extract_values(self.req))
00079         equality_function(self.output, self.rsp)
00080 
00081 
00082 class TestServices(unittest.TestCase):
00083 
00084     def setUp(self):
00085         rospy.init_node("test_services")
00086 
00087     def msgs_equal(self, msg1, msg2):
00088         if type(msg1) in [str, unicode] and type(msg2) in [str, unicode]:
00089             pass
00090         else:
00091             self.assertEqual(type(msg1), type(msg2))
00092         if type(msg1) in c.list_types:
00093             for x, y in zip(msg1, msg2):
00094                 self.msgs_equal(x, y)
00095         elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00096             self.assertEqual(msg1, msg2)
00097         else:
00098             for x in msg1:
00099                 self.assertTrue(x in msg2)
00100             for x in msg2:
00101                 self.assertTrue(x in msg1)
00102             for x in msg1:
00103                 self.msgs_equal(msg1[x], msg2[x])
00104 
00105     def test_populate_request_args(self):
00106         # Test empty messages
00107         for srv_type in ["TestEmpty", "TestResponseOnly"]:
00108             cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
00109             for args in [[], {}, None]:
00110                 # Should throw no exceptions
00111                 services.args_to_service_request_instance("", cls._request_class(), args)
00112 
00113         # Test msgs with data message
00114         for srv_type in ["TestRequestOnly", "TestRequestAndResponse"]:
00115             cls = ros_loader.get_service_class("rosbridge_library/" + srv_type)
00116             for args in [[3], {"data": 3}]:
00117                 # Should throw no exceptions
00118                 services.args_to_service_request_instance("", cls._request_class(), args)
00119             self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance, "", cls._request_class(), ["hello"])
00120 
00121         # Test message with multiple fields
00122         cls = ros_loader.get_service_class("rosbridge_library/TestMultipleRequestFields")
00123         for args in [[3, 3.5, "hello", False], {"int": 3, "float": 3.5, "string": "hello", "bool": False}]:
00124             # Should throw no exceptions
00125             services.args_to_service_request_instance("", cls._request_class(), args)
00126 
00127     def test_service_call(self):
00128         """ Test a simple getloggers service call """
00129         # First, call the service the 'proper' way
00130         p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00131         ret = p()
00132 
00133         # Now, call using the services
00134         json_ret = services.call_service("/rosout/get_loggers")
00135         for x, y in zip(ret.loggers, json_ret["loggers"]):
00136             self.assertEqual(x.name, y["name"])
00137             self.assertEqual(x.level, y["level"])
00138 
00139     def test_service_caller(self):
00140         """ Same as test_service_call but via the thread caller """
00141         # First, call the service the 'proper' way
00142         p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00143         ret = p()
00144 
00145         rcvd = {"json": None}
00146 
00147         def success(json):
00148             rcvd["json"] = json
00149 
00150         def error():
00151             raise Exception()
00152 
00153         # Now, call using the services
00154         services.ServiceCaller("/rosout/get_loggers", None, success, error).start()
00155 
00156         time.sleep(0.5)
00157 
00158         for x, y in zip(ret.loggers, rcvd["json"]["loggers"]):
00159             self.assertEqual(x.name, y["name"])
00160             self.assertEqual(x.level, y["level"])
00161 
00162     def test_service_tester(self):
00163         t = ServiceTester("/test_service_tester", "rosbridge_library/TestRequestAndResponse")
00164         t.start()
00165         time.sleep(1.0)
00166         t.validate(self.msgs_equal)
00167 
00168     def test_service_tester_alltypes(self):
00169         ts = []
00170         for srv in ["TestResponseOnly", "TestEmpty", "TestRequestAndResponse", "TestRequestOnly",
00171             "TestMultipleResponseFields", "TestMultipleRequestFields", "TestArrayRequest"]:
00172             t = ServiceTester("/test_service_tester_alltypes_" + srv, "rosbridge_library/" + srv)
00173             t.start()
00174             ts.append(t)
00175 
00176         time.sleep(1.0)
00177 
00178         for t in ts:
00179             t.validate(self.msgs_equal)
00180 
00181     def test_random_service_types(self):
00182         common = ["roscpp/GetLoggers", "roscpp/SetLoggerLevel",
00183         "std_srvs/Empty", "nav_msgs/GetMap", "nav_msgs/GetPlan",
00184         "sensor_msgs/SetCameraInfo", "topic_tools/MuxAdd",
00185         "topic_tools/MuxSelect", "tf2_msgs/FrameGraph",
00186         "rospy_tutorials/BadTwoInts", "rospy_tutorials/AddTwoInts"]
00187         ts = []
00188         for srv in common:
00189             t = ServiceTester("/test_random_service_types/" + srv, srv)
00190             t.start()
00191             ts.append(t)
00192 
00193         time.sleep(1.0)
00194 
00195         for t in ts:
00196             t.validate(self.msgs_equal)
00197 
00198 
00199 PKG = 'rosbridge_library'
00200 NAME = 'test_services'
00201 if __name__ == '__main__':
00202     rostest.unitrun(PKG, NAME, TestServices)
00203 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Mon Oct 6 2014 06:58:09