test_call_service.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007 
00008 from roscpp.srv import GetLoggers
00009 
00010 from json import loads, dumps
00011 from std_msgs.msg import String
00012 
00013 from rosbridge_library.capabilities.call_service import CallService
00014 from rosbridge_library.protocol import Protocol
00015 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00016 
00017 
00018 class TestCallService(unittest.TestCase):
00019 
00020     def setUp(self):
00021         rospy.init_node("test_call_service")
00022 
00023     def test_missing_arguments(self):
00024         proto = Protocol("test_missing_arguments")
00025         s = CallService(proto)
00026         msg = loads(dumps({"op": "call_service"}))
00027         self.assertRaises(MissingArgumentException, s.call_service, msg)
00028 
00029     def test_invalid_arguments(self):
00030         proto = Protocol("test_invalid_arguments")
00031         s = CallService(proto)
00032 
00033         msg = loads(dumps({"op": "call_service", "service": 3}))
00034         self.assertRaises(InvalidArgumentException, s.call_service, msg)
00035 
00036     def test_call_service_works(self):
00037         # First, call the service the 'proper' way
00038         p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00039         p.wait_for_service()
00040         time.sleep(1.0)
00041         ret = p()
00042 
00043         proto = Protocol("test_call_service_works")
00044         s = CallService(proto)
00045         msg = loads(dumps({"op": "call_service", "service": "/rosout/get_loggers"}))
00046 
00047         received = {"msg": None, "arrived": False}
00048 
00049         def cb(msg, cid=None):
00050             received["msg"] = msg
00051             received["arrived"] = True
00052 
00053         proto.send = cb
00054 
00055         s.call_service(msg)
00056 
00057         timeout = 5.0
00058         start = rospy.Time.now()
00059         while rospy.Time.now() - start < rospy.Duration(timeout):
00060             if received["arrived"]:
00061                 break
00062             time.sleep(0.1)
00063 
00064         self.assertTrue(received["msg"]["result"])
00065         for x, y in zip(ret.loggers, received["msg"]["values"]["loggers"]):
00066             self.assertEqual(x.name, y["name"])
00067             self.assertEqual(x.level, y["level"])
00068 
00069     def test_call_service_fail(self):
00070         proto = Protocol("test_call_service_fail")
00071         s = CallService(proto)
00072         send_msg = loads(dumps({"op": "call_service", "service": "/rosout/set_logger_level", "args": '["ros", "invalid"]'}))
00073 
00074         received = {"msg": None, "arrived": False}
00075         def cb(msg, cid=None):
00076             received["msg"] = msg
00077             received["arrived"] = True
00078 
00079         proto.send = cb
00080 
00081         s.call_service(send_msg)
00082 
00083         timeout = 5.0
00084         start = rospy.Time.now()
00085         while rospy.Time.now() - start < rospy.Duration(timeout):
00086             if received["arrived"]:
00087                 break
00088             time.sleep(0.1)
00089 
00090         self.assertFalse(received["msg"]["result"])
00091 
00092 
00093 PKG = 'rosbridge_library'
00094 NAME = 'test_call_service'
00095 if __name__ == '__main__':
00096     rostest.unitrun(PKG, NAME, TestCallService)
00097 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43