Go to the documentation of this file.00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007
00008 from roscpp.srv import GetLoggers
00009
00010 from json import loads, dumps
00011 from std_msgs.msg import String
00012
00013 from rosbridge_library.capabilities.call_service import CallService
00014 from rosbridge_library.protocol import Protocol
00015 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00016
00017
00018 class TestCallService(unittest.TestCase):
00019
00020 def setUp(self):
00021 rospy.init_node("test_call_service")
00022
00023 def test_missing_arguments(self):
00024 proto = Protocol("test_missing_arguments")
00025 s = CallService(proto)
00026 msg = loads(dumps({"op": "call_service"}))
00027 self.assertRaises(MissingArgumentException, s.call_service, msg)
00028
00029 def test_invalid_arguments(self):
00030 proto = Protocol("test_invalid_arguments")
00031 s = CallService(proto)
00032
00033 msg = loads(dumps({"op": "call_service", "service": 3}))
00034 self.assertRaises(InvalidArgumentException, s.call_service, msg)
00035
00036 def test_call_service_works(self):
00037
00038 p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00039 ret = p()
00040
00041
00042 proto = Protocol("test_call_service_works")
00043 s = CallService(proto)
00044 msg = loads(dumps({"op": "call_service", "service": "/rosout/get_loggers"}))
00045
00046 received = {"msg": None}
00047
00048 def cb(msg, cid=None):
00049 received["msg"] = msg
00050
00051 proto.send = cb
00052
00053 s.call_service(msg)
00054
00055 time.sleep(0.5)
00056
00057 self.assertTrue(received["msg"]["result"])
00058 for x, y in zip(ret.loggers, received["msg"]["values"]["loggers"]):
00059 self.assertEqual(x.name, y["name"])
00060 self.assertEqual(x.level, y["level"])
00061
00062 def test_call_service_fail(self):
00063 proto = Protocol("test_call_service_fail")
00064 s = CallService(proto)
00065 send_msg = loads(dumps({"op": "call_service", "service": "/rosout/set_logger_level", "args": '["ros", "invalid"]'}))
00066
00067 received = {"msg": None}
00068
00069 def cb(msg, cid=None):
00070 received["msg"] = msg
00071
00072 proto.send = cb
00073
00074 s.call_service(send_msg)
00075
00076 time.sleep(0.5)
00077
00078 self.assertFalse(received["msg"]["result"])
00079
00080
00081 PKG = 'rosbridge_library'
00082 NAME = 'test_call_service'
00083 if __name__ == '__main__':
00084 rostest.unitrun(PKG, NAME, TestCallService)
00085