8 from json
import loads, dumps
16 PKG =
'rosbridge_library'
17 NAME =
'test_subscribe'
29 """ Adds a bunch of random clients to the subscription and sees whether
30 the correct parameters are chosen as the min """
31 client_id =
"client_test_update_params"
32 topic =
"/test_update_params"
33 msg_type =
"std_msgs/String"
35 subscription = subscribe.Subscription(client_id, topic,
None)
41 for throttle_rate
in range(min_throttle_rate, min_throttle_rate + 10):
42 for queue_length
in range(min_queue_length, min_queue_length + 10):
43 for frag_size
in range(min_frag_size, min_frag_size + 10):
44 sid = throttle_rate * 100 + queue_length * 10 + frag_size
45 subscription.subscribe(sid, msg_type, throttle_rate,
46 queue_length, frag_size)
48 subscription.update_params()
51 self.assertEqual(subscription.throttle_rate, min_throttle_rate)
52 self.assertEqual(subscription.queue_length, min_queue_length)
53 self.assertEqual(subscription.fragment_size, min_frag_size)
54 self.assertEqual(subscription.compression,
"none")
56 list(subscription.clients.values())[0][
"compression"] =
"png"
58 subscription.update_params()
60 self.assertEqual(subscription.throttle_rate, min_throttle_rate)
61 self.assertEqual(subscription.queue_length, min_queue_length)
62 self.assertEqual(subscription.fragment_size, min_frag_size)
63 self.assertEqual(subscription.compression,
"png")
65 subscription.unregister()
68 subscription.unregister()
71 proto =
Protocol(
"test_missing_arguments")
72 sub = subscribe.Subscribe(proto)
73 msg = {
"op":
"subscribe"}
74 self.assertRaises(MissingArgumentException, sub.subscribe, msg)
77 proto =
Protocol(
"test_invalid_arguments")
78 sub = subscribe.Subscribe(proto)
80 msg = {
"op":
"subscribe",
"topic": 3}
81 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
83 msg = {
"op":
"subscribe",
"topic":
"/jon",
"type": 3}
84 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
86 msg = {
"op":
"subscribe",
"topic":
"/jon",
"throttle_rate":
"fast"}
87 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
89 msg = {
"op":
"subscribe",
"topic":
"/jon",
"fragment_size":
"five cubits"}
90 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
92 msg = {
"op":
"subscribe",
"topic":
"/jon",
"queue_length":
"long"}
93 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
95 msg = {
"op":
"subscribe",
"topic":
"/jon",
"compression": 9000}
96 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
99 proto =
Protocol(
"test_subscribe_works")
100 sub = subscribe.Subscribe(proto)
101 topic =
"/test_subscribe_works"
103 msg.data =
"test test_subscribe_works works"
104 msg_type =
"std_msgs/String"
106 received = {
"msg":
None}
108 def send(outgoing, **kwargs):
109 received[
"msg"] = outgoing
113 sub.subscribe(
loads(
dumps({
"op":
"subscribe",
"topic": topic,
"type": msg_type})))
115 p = rospy.Publisher(topic, String, queue_size=5)
120 self.assertEqual(received[
"msg"][
"msg"][
"data"], msg.data)
123 if __name__ ==
'__main__':
124 rosunit.unitrun(PKG, NAME, TestSubscribe)