00001
00002 PKG = 'rosbridge_library'
00003 import roslib
00004 roslib.load_manifest(PKG)
00005 roslib.load_manifest("std_msgs")
00006 import rospy
00007
00008 import unittest
00009 import time
00010
00011 from json import loads, dumps
00012 from std_msgs.msg import String
00013
00014 from rosbridge_library.capabilities import subscribe
00015 from rosbridge_library.protocol import Protocol
00016 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00017
00018
00019 class TestSubscribe(unittest.TestCase):
00020
00021 def setUp(self):
00022 rospy.init_node("test_subscribe")
00023
00024 def dummy_cb(self, msg):
00025 pass
00026
00027 def test_update_params(self):
00028 """ Adds a bunch of random clients to the subscription and sees whether
00029 the correct parameters are chosen as the min """
00030 client_id = "client_test_update_params"
00031 topic = "/test_update_params"
00032 msg_type = "std_msgs/String"
00033
00034 subscription = subscribe.Subscription(client_id, topic, None)
00035
00036 min_throttle_rate = 5
00037 min_queue_length = 2
00038 min_frag_size = 20
00039
00040 for throttle_rate in range(min_throttle_rate, min_throttle_rate + 10):
00041 for queue_length in range(min_queue_length, min_queue_length + 10):
00042 for frag_size in range(min_frag_size, min_frag_size + 10):
00043 sid = throttle_rate * 100 + queue_length * 10 + frag_size
00044 subscription.subscribe(sid, msg_type, throttle_rate,
00045 queue_length, frag_size)
00046
00047 subscription.update_params()
00048
00049 try:
00050 self.assertEqual(subscription.throttle_rate, min_throttle_rate)
00051 self.assertEqual(subscription.queue_length, min_queue_length)
00052 self.assertEqual(subscription.fragment_size, min_frag_size)
00053 self.assertEqual(subscription.compression, "none")
00054
00055 subscription.clients.values()[0]["compression"] = "png"
00056
00057 subscription.update_params()
00058
00059 self.assertEqual(subscription.throttle_rate, min_throttle_rate)
00060 self.assertEqual(subscription.queue_length, min_queue_length)
00061 self.assertEqual(subscription.fragment_size, min_frag_size)
00062 self.assertEqual(subscription.compression, "png")
00063 except:
00064 subscription.unregister()
00065 raise
00066
00067 subscription.unregister()
00068
00069 def test_missing_arguments(self):
00070 proto = Protocol("test_missing_arguments")
00071 sub = subscribe.Subscribe(proto)
00072 msg = {"op": "subscribe"}
00073 self.assertRaises(MissingArgumentException, sub.subscribe, msg)
00074
00075 def test_invalid_arguments(self):
00076 proto = Protocol("test_invalid_arguments")
00077 sub = subscribe.Subscribe(proto)
00078
00079 msg = {"op": "subscribe", "topic": 3}
00080 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00081
00082 msg = {"op": "subscribe", "topic": "/jon", "type": 3}
00083 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00084
00085 msg = {"op": "subscribe", "topic": "/jon", "throttle_rate": "fast"}
00086 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00087
00088 msg = {"op": "subscribe", "topic": "/jon", "fragment_size": "five cubits"}
00089 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00090
00091 msg = {"op": "subscribe", "topic": "/jon", "queue_length": "long"}
00092 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00093
00094 msg = {"op": "subscribe", "topic": "/jon", "compression": 9000}
00095 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00096
00097 def test_subscribe_works(self):
00098 proto = Protocol("test_subscribe_works")
00099 sub = subscribe.Subscribe(proto)
00100 topic = "/test_subscribe_works"
00101 msg = String()
00102 msg.data = "test test_subscribe_works works"
00103 msg_type = "std_msgs/String"
00104
00105 received = {"msg": None}
00106
00107 def send(outgoing):
00108 received["msg"] = outgoing
00109
00110 proto.send = send
00111
00112 sub.subscribe(loads(dumps({"op": "subscribe", "topic": topic, "type": msg_type})))
00113
00114 p = rospy.Publisher(topic, String)
00115 time.sleep(0.25)
00116 p.publish(msg)
00117
00118 time.sleep(0.25)
00119 self.assertEqual(received["msg"]["msg"]["data"], msg.data)
00120
00121 if __name__ == '__main__':
00122 import rostest
00123 rostest.rosrun(PKG, 'test_subscribe', TestSubscribe)