00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007
00008 from json import loads, dumps
00009 from std_msgs.msg import String
00010
00011 from rosbridge_library.capabilities import subscribe
00012 from rosbridge_library.protocol import Protocol
00013 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00014
00015
00016 class TestSubscribe(unittest.TestCase):
00017
00018 def setUp(self):
00019 rospy.init_node("test_subscribe")
00020
00021 def dummy_cb(self, msg):
00022 pass
00023
00024 def test_update_params(self):
00025 """ Adds a bunch of random clients to the subscription and sees whether
00026 the correct parameters are chosen as the min """
00027 client_id = "client_test_update_params"
00028 topic = "/test_update_params"
00029 msg_type = "std_msgs/String"
00030
00031 subscription = subscribe.Subscription(client_id, topic, None)
00032
00033 min_throttle_rate = 5
00034 min_queue_length = 2
00035 min_frag_size = 20
00036
00037 for throttle_rate in range(min_throttle_rate, min_throttle_rate + 10):
00038 for queue_length in range(min_queue_length, min_queue_length + 10):
00039 for frag_size in range(min_frag_size, min_frag_size + 10):
00040 sid = throttle_rate * 100 + queue_length * 10 + frag_size
00041 subscription.subscribe(sid, msg_type, throttle_rate,
00042 queue_length, frag_size)
00043
00044 subscription.update_params()
00045
00046 try:
00047 self.assertEqual(subscription.throttle_rate, min_throttle_rate)
00048 self.assertEqual(subscription.queue_length, min_queue_length)
00049 self.assertEqual(subscription.fragment_size, min_frag_size)
00050 self.assertEqual(subscription.compression, "none")
00051
00052 list(subscription.clients.values())[0]["compression"] = "png"
00053
00054 subscription.update_params()
00055
00056 self.assertEqual(subscription.throttle_rate, min_throttle_rate)
00057 self.assertEqual(subscription.queue_length, min_queue_length)
00058 self.assertEqual(subscription.fragment_size, min_frag_size)
00059 self.assertEqual(subscription.compression, "png")
00060 except:
00061 subscription.unregister()
00062 raise
00063
00064 subscription.unregister()
00065
00066 def test_missing_arguments(self):
00067 proto = Protocol("test_missing_arguments")
00068 sub = subscribe.Subscribe(proto)
00069 msg = {"op": "subscribe"}
00070 self.assertRaises(MissingArgumentException, sub.subscribe, msg)
00071
00072 def test_invalid_arguments(self):
00073 proto = Protocol("test_invalid_arguments")
00074 sub = subscribe.Subscribe(proto)
00075
00076 msg = {"op": "subscribe", "topic": 3}
00077 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00078
00079 msg = {"op": "subscribe", "topic": "/jon", "type": 3}
00080 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00081
00082 msg = {"op": "subscribe", "topic": "/jon", "throttle_rate": "fast"}
00083 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00084
00085 msg = {"op": "subscribe", "topic": "/jon", "fragment_size": "five cubits"}
00086 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00087
00088 msg = {"op": "subscribe", "topic": "/jon", "queue_length": "long"}
00089 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00090
00091 msg = {"op": "subscribe", "topic": "/jon", "compression": 9000}
00092 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00093
00094 def test_subscribe_works(self):
00095 proto = Protocol("test_subscribe_works")
00096 sub = subscribe.Subscribe(proto)
00097 topic = "/test_subscribe_works"
00098 msg = String()
00099 msg.data = "test test_subscribe_works works"
00100 msg_type = "std_msgs/String"
00101
00102 received = {"msg": None}
00103
00104 def send(outgoing):
00105 received["msg"] = outgoing
00106
00107 proto.send = send
00108
00109 sub.subscribe(loads(dumps({"op": "subscribe", "topic": topic, "type": msg_type})))
00110
00111 p = rospy.Publisher(topic, String, queue_size=5)
00112 time.sleep(0.25)
00113 p.publish(msg)
00114
00115 time.sleep(0.25)
00116 self.assertEqual(received["msg"]["msg"]["data"], msg.data)
00117
00118
00119 PKG = 'rosbridge_library'
00120 NAME = 'test_subscribe'
00121 if __name__ == '__main__':
00122 rostest.unitrun(PKG, NAME, TestSubscribe)
00123