test_services.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG)
00004 import rospy
00005 
00006 import time
00007 import random
00008 
00009 from rosbridge_library.internal import services, ros_loader
00010 from rosbridge_library.internal import message_conversion as c
00011 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00012 
00013 roslib.load_manifest("roscpp")
00014 from roscpp.srv import GetLoggers
00015 
00016 import unittest
00017 
00018 
00019 def populate_random_args(d):
00020     # Given a dictionary d, replaces primitives with random values
00021     if isinstance(d, dict):
00022         for x in d:
00023             d[x] = populate_random_args(d[x])
00024         return d
00025     elif isinstance(d, str):
00026         return str(random.random())
00027     elif isinstance(d, unicode):
00028         return unicode(random.random())
00029     elif isinstance(d, bool):
00030         return True
00031     elif isinstance(d, int):
00032         return random.randint(100, 200)
00033     elif isinstance(d, float):
00034         return 3.5
00035     else:
00036         return d
00037 
00038 
00039 class ServiceTester:
00040 
00041     def __init__(self, name, srv_type):
00042         self.name = name
00043         self.srvClass = ros_loader.get_service_class(srv_type)
00044         self.service = rospy.Service(name, self.srvClass, self.callback)
00045 
00046     def start(self):
00047         req = self.srvClass._request_class()
00048         gen = c.extract_values(req)
00049         gen = populate_random_args(gen)
00050         self.input = gen
00051         services.ServiceCaller(self.name, gen, self.success, self.error).start()
00052 
00053     def callback(self, req):
00054         self.req = req
00055         time.sleep(0.5)
00056         rsp = self.srvClass._response_class()
00057         gen = c.extract_values(rsp)
00058         gen = populate_random_args(gen)
00059         try:
00060             rsp = c.populate_instance(gen, rsp)
00061         except:
00062             print "populating instance"
00063             print rsp
00064             print "populating with"
00065             print gen
00066             raise
00067         self.output = gen
00068         return rsp
00069 
00070     def success(self, rsp):
00071         self.rsp = rsp
00072 
00073     def error(self, exc):
00074         self.exc = exc
00075 
00076     def validate(self, equality_function):
00077         if hasattr(self, "exc"):
00078             print self.exc
00079             print self.exc.message
00080             raise self.exc
00081         equality_function(self.input, c.extract_values(self.req))
00082         equality_function(self.output, self.rsp)
00083 
00084 
00085 class TestServices(unittest.TestCase):
00086 
00087     def setUp(self):
00088         rospy.init_node("test_services")
00089 
00090     def msgs_equal(self, msg1, msg2):
00091         if type(msg1) in [str, unicode] and type(msg2) in [str, unicode]:
00092             pass
00093         else:
00094             self.assertEqual(type(msg1), type(msg2))
00095         if type(msg1) in c.list_types:
00096             for x, y in zip(msg1, msg2):
00097                 self.msgs_equal(x, y)
00098         elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00099             self.assertEqual(msg1, msg2)
00100         else:
00101             for x in msg1:
00102                 self.assertTrue(x in msg2)
00103             for x in msg2:
00104                 self.assertTrue(x in msg1)
00105             for x in msg1:
00106                 self.msgs_equal(msg1[x], msg2[x])
00107 
00108     def test_populate_request_args(self):
00109         # Test empty messages
00110         for srv_type in ["Empty", "ResponseOnly"]:
00111             cls = ros_loader.get_service_class("rosbridge_test/" + srv_type)
00112             for args in [[], {}, None]:
00113                 # Should throw no exceptions
00114                 services.args_to_service_request_instance("", cls._request_class(), args)
00115 
00116         # Test msgs with data message
00117         for srv_type in ["RequestOnly", "RequestAndResponse"]:
00118             cls = ros_loader.get_service_class("rosbridge_test/" + srv_type)
00119             for args in [[3], {"data": 3}]:
00120                 # Should throw no exceptions
00121                 services.args_to_service_request_instance("", cls._request_class(), args)
00122             self.assertRaises(FieldTypeMismatchException, services.args_to_service_request_instance, "", cls._request_class(), ["hello"])
00123 
00124         # Test message with multiple fields
00125         cls = ros_loader.get_service_class("rosbridge_test/MultipleRequestFields")
00126         for args in [[3, 3.5, "hello", False], {"int": 3, "float": 3.5, "string": "hello", "bool": False}]:
00127             # Should throw no exceptions
00128             services.args_to_service_request_instance("", cls._request_class(), args)
00129 
00130     def test_service_call(self):
00131         """ Test a simple getloggers service call """
00132         # First, call the service the 'proper' way
00133         p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00134         ret = p()
00135 
00136         # Now, call using the services
00137         json_ret = services.call_service("/rosout/get_loggers")
00138         for x, y in zip(ret.loggers, json_ret["loggers"]):
00139             self.assertEqual(x.name, y["name"])
00140             self.assertEqual(x.level, y["level"])
00141 
00142     def test_service_caller(self):
00143         """ Same as test_service_call but via the thread caller """
00144         # First, call the service the 'proper' way
00145         p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00146         ret = p()
00147 
00148         rcvd = {"json": None}
00149 
00150         def success(json):
00151             rcvd["json"] = json
00152 
00153         def error():
00154             raise Exception()
00155 
00156         # Now, call using the services
00157         services.ServiceCaller("/rosout/get_loggers", None, success, error).start()
00158 
00159         time.sleep(0.5)
00160 
00161         for x, y in zip(ret.loggers, rcvd["json"]["loggers"]):
00162             self.assertEqual(x.name, y["name"])
00163             self.assertEqual(x.level, y["level"])
00164 
00165     def test_service_tester(self):
00166         t = ServiceTester("/test_service_tester", "rosbridge_test/RequestAndResponse")
00167         t.start()
00168         time.sleep(1.0)
00169         t.validate(self.msgs_equal)
00170 
00171     def test_service_tester_alltypes(self):
00172         ts = []
00173         for srv in ["ResponseOnly", "Empty", "RequestAndResponse", "RequestOnly",
00174             "MultipleResponseFields", "MultipleRequestFields", "ArrayRequest"]:
00175             t = ServiceTester("/test_service_tester_alltypes_" + srv, "rosbridge_test/" + srv)
00176             t.start()
00177             ts.append(t)
00178 
00179         time.sleep(1.0)
00180 
00181         for t in ts:
00182             t.validate(self.msgs_equal)
00183 
00184     def test_random_service_types(self):
00185         common = ["roscpp/GetLoggers", "roscpp/SetLoggerLevel",
00186         "std_srvs/Empty", "nav_msgs/GetMap", "nav_msgs/GetPlan",
00187         "sensor_msgs/SetCameraInfo", "topic_tools/MuxAdd",
00188         "topic_tools/MuxSelect", "tf2_msgs/FrameGraph",
00189         "rospy_tutorials/BadTwoInts", "rospy_tutorials/AddTwoInts"]
00190         ts = []
00191         for srv in common:
00192             t = ServiceTester("/test_random_service_types/" + srv, srv)
00193             t.start()
00194             ts.append(t)
00195 
00196         time.sleep(1.0)
00197 
00198         for t in ts:
00199             t.validate(self.msgs_equal)
00200 
00201 
00202 if __name__ == '__main__':
00203     import rostest
00204     rostest.rosrun(PKG, 'test_services', TestServices)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04