8 from json
import loads, dumps
19 rospy.init_node(
"test_subscribe")
25 """ Adds a bunch of random clients to the subscription and sees whether 26 the correct parameters are chosen as the min """ 27 client_id =
"client_test_update_params" 28 topic =
"/test_update_params" 29 msg_type =
"std_msgs/String" 31 subscription = subscribe.Subscription(client_id, topic,
None)
37 for throttle_rate
in range(min_throttle_rate, min_throttle_rate + 10):
38 for queue_length
in range(min_queue_length, min_queue_length + 10):
39 for frag_size
in range(min_frag_size, min_frag_size + 10):
40 sid = throttle_rate * 100 + queue_length * 10 + frag_size
41 subscription.subscribe(sid, msg_type, throttle_rate,
42 queue_length, frag_size)
44 subscription.update_params()
47 self.assertEqual(subscription.throttle_rate, min_throttle_rate)
48 self.assertEqual(subscription.queue_length, min_queue_length)
49 self.assertEqual(subscription.fragment_size, min_frag_size)
50 self.assertEqual(subscription.compression,
"none")
52 list(subscription.clients.values())[0][
"compression"] =
"png" 54 subscription.update_params()
56 self.assertEqual(subscription.throttle_rate, min_throttle_rate)
57 self.assertEqual(subscription.queue_length, min_queue_length)
58 self.assertEqual(subscription.fragment_size, min_frag_size)
59 self.assertEqual(subscription.compression,
"png")
61 subscription.unregister()
64 subscription.unregister()
67 proto =
Protocol(
"test_missing_arguments")
68 sub = subscribe.Subscribe(proto)
69 msg = {
"op":
"subscribe"}
70 self.assertRaises(MissingArgumentException, sub.subscribe, msg)
73 proto =
Protocol(
"test_invalid_arguments")
74 sub = subscribe.Subscribe(proto)
76 msg = {
"op":
"subscribe",
"topic": 3}
77 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
79 msg = {
"op":
"subscribe",
"topic":
"/jon",
"type": 3}
80 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
82 msg = {
"op":
"subscribe",
"topic":
"/jon",
"throttle_rate":
"fast"}
83 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
85 msg = {
"op":
"subscribe",
"topic":
"/jon",
"fragment_size":
"five cubits"}
86 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
88 msg = {
"op":
"subscribe",
"topic":
"/jon",
"queue_length":
"long"}
89 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
91 msg = {
"op":
"subscribe",
"topic":
"/jon",
"compression": 9000}
92 self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
95 proto =
Protocol(
"test_subscribe_works")
96 sub = subscribe.Subscribe(proto)
97 topic =
"/test_subscribe_works" 99 msg.data =
"test test_subscribe_works works" 100 msg_type =
"std_msgs/String" 102 received = {
"msg":
None}
105 received[
"msg"] = outgoing
109 sub.subscribe(
loads(
dumps({
"op":
"subscribe",
"topic": topic,
"type": msg_type})))
111 p = rospy.Publisher(topic, String, queue_size=5)
116 self.assertEqual(received[
"msg"][
"msg"][
"data"], msg.data)
119 PKG =
'rosbridge_library' 120 NAME =
'test_subscribe' 121 if __name__ ==
'__main__':
122 rostest.unitrun(PKG, NAME, TestSubscribe)
def dumps(ob, sort_keys=False)
def test_subscribe_works(self)
def test_missing_arguments(self)
def test_invalid_arguments(self)
def test_update_params(self)