test_call_service.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007 
00008 from roscpp.srv import GetLoggers
00009 
00010 from json import loads, dumps
00011 from std_msgs.msg import String
00012 
00013 from rosbridge_library.capabilities.call_service import CallService
00014 from rosbridge_library.protocol import Protocol
00015 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00016 
00017 
00018 class TestCallService(unittest.TestCase):
00019 
00020     def setUp(self):
00021         rospy.init_node("test_call_service")
00022 
00023     def test_missing_arguments(self):
00024         proto = Protocol("test_missing_arguments")
00025         s = CallService(proto)
00026         msg = loads(dumps({"op": "call_service"}))
00027         self.assertRaises(MissingArgumentException, s.call_service, msg)
00028 
00029     def test_invalid_arguments(self):
00030         proto = Protocol("test_invalid_arguments")
00031         s = CallService(proto)
00032 
00033         msg = loads(dumps({"op": "call_service", "service": 3}))
00034         self.assertRaises(InvalidArgumentException, s.call_service, msg)
00035 
00036     def test_call_service_works(self):
00037         # First, call the service the 'proper' way
00038         p = rospy.ServiceProxy("/rosout/get_loggers", GetLoggers)
00039         ret = p()
00040 
00041 
00042         proto = Protocol("test_call_service_works")
00043         s = CallService(proto)
00044         msg = loads(dumps({"op": "call_service", "service": "/rosout/get_loggers"}))
00045 
00046         received = {"msg": None}
00047 
00048         def cb(msg, cid=None):
00049             received["msg"] = msg
00050 
00051         proto.send = cb
00052 
00053         s.call_service(msg)
00054 
00055         time.sleep(0.5)
00056 
00057         self.assertTrue(received["msg"]["result"])
00058         for x, y in zip(ret.loggers, received["msg"]["values"]["loggers"]):
00059             self.assertEqual(x.name, y["name"])
00060             self.assertEqual(x.level, y["level"])
00061 
00062     def test_call_service_fail(self):
00063         proto = Protocol("test_call_service_fail")
00064         s = CallService(proto)
00065         send_msg = loads(dumps({"op": "call_service", "service": "/rosout/set_logger_level", "args": '["ros", "invalid"]'}))
00066 
00067         received = {"msg": None}
00068 
00069         def cb(msg, cid=None): 
00070             received["msg"] = msg
00071 
00072         proto.send = cb
00073 
00074         s.call_service(send_msg)
00075 
00076         time.sleep(0.5)
00077 
00078         self.assertFalse(received["msg"]["result"])
00079 
00080 
00081 PKG = 'rosbridge_library'
00082 NAME = 'test_call_service'
00083 if __name__ == '__main__':
00084     rostest.unitrun(PKG, NAME, TestCallService)
00085 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Mon Oct 6 2014 06:58:09