Go to the documentation of this file.00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007
00008 from roscpp.srv import GetLoggers
00009
00010 from json import loads, dumps
00011 from std_msgs.msg import String
00012
00013 from rosbridge_library.capabilities.call_service import CallService
00014 from rosbridge_library.protocol import Protocol
00015 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00016
00017
00018 class TestCallService(unittest.TestCase):
00019
00020 def setUp(self):
00021 rospy.init_node("test_call_service")
00022
00023 def test_missing_arguments(self):
00024 proto = Protocol("test_missing_arguments")
00025 s = CallService(proto)
00026 msg = loads(dumps({"op": "call_service"}))
00027 self.assertRaises(MissingArgumentException, s.call_service, msg)
00028
00029 def test_invalid_arguments(self):
00030 proto = Protocol("test_invalid_arguments")
00031 s = CallService(proto)
00032
00033 msg = loads(dumps({"op": "call_service", "service": 3}))
00034 self.assertRaises(InvalidArgumentException, s.call_service, msg)
00035
00036 def test_call_service_works(self):
00037
00038 p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00039 p.wait_for_service()
00040 time.sleep(1.0)
00041 ret = p()
00042
00043 proto = Protocol("test_call_service_works")
00044 s = CallService(proto)
00045 msg = loads(dumps({"op": "call_service", "service": "/rosout/get_loggers"}))
00046
00047 received = {"msg": None, "arrived": False}
00048
00049 def cb(msg, cid=None):
00050 received["msg"] = msg
00051 received["arrived"] = True
00052
00053 proto.send = cb
00054
00055 s.call_service(msg)
00056
00057 timeout = 5.0
00058 start = rospy.Time.now()
00059 while rospy.Time.now() - start < rospy.Duration(timeout):
00060 if received["arrived"]:
00061 break
00062 time.sleep(0.1)
00063
00064 self.assertTrue(received["msg"]["result"])
00065 for x, y in zip(ret.loggers, received["msg"]["values"]["loggers"]):
00066 self.assertEqual(x.name, y["name"])
00067 self.assertEqual(x.level, y["level"])
00068
00069 def test_call_service_fail(self):
00070 proto = Protocol("test_call_service_fail")
00071 s = CallService(proto)
00072 send_msg = loads(dumps({"op": "call_service", "service": "/rosout/set_logger_level", "args": '["ros", "invalid"]'}))
00073
00074 received = {"msg": None, "arrived": False}
00075 def cb(msg, cid=None):
00076 received["msg"] = msg
00077 received["arrived"] = True
00078
00079 proto.send = cb
00080
00081 s.call_service(send_msg)
00082
00083 timeout = 5.0
00084 start = rospy.Time.now()
00085 while rospy.Time.now() - start < rospy.Duration(timeout):
00086 if received["arrived"]:
00087 break
00088 time.sleep(0.1)
00089
00090 self.assertFalse(received["msg"]["result"])
00091
00092
00093 PKG = 'rosbridge_library'
00094 NAME = 'test_call_service'
00095 if __name__ == '__main__':
00096 rostest.unitrun(PKG, NAME, TestCallService)
00097