test_subscribe.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 import time
7 
8 from json import loads, dumps
9 from std_msgs.msg import String
10 
11 from rosbridge_library.capabilities import subscribe
12 from rosbridge_library.protocol import Protocol
13 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
14 
15 
16 PKG = 'rosbridge_library'
17 NAME = 'test_subscribe'
18 
19 
20 class TestSubscribe(unittest.TestCase):
21 
22  def setUp(self):
23  rospy.init_node(NAME)
24 
25  def dummy_cb(self, msg):
26  pass
27 
28  def test_update_params(self):
29  """ Adds a bunch of random clients to the subscription and sees whether
30  the correct parameters are chosen as the min """
31  client_id = "client_test_update_params"
32  topic = "/test_update_params"
33  msg_type = "std_msgs/String"
34 
35  subscription = subscribe.Subscription(client_id, topic, None)
36 
37  min_throttle_rate = 5
38  min_queue_length = 2
39  min_frag_size = 20
40 
41  for throttle_rate in range(min_throttle_rate, min_throttle_rate + 10):
42  for queue_length in range(min_queue_length, min_queue_length + 10):
43  for frag_size in range(min_frag_size, min_frag_size + 10):
44  sid = throttle_rate * 100 + queue_length * 10 + frag_size
45  subscription.subscribe(sid, msg_type, throttle_rate,
46  queue_length, frag_size)
47 
48  subscription.update_params()
49 
50  try:
51  self.assertEqual(subscription.throttle_rate, min_throttle_rate)
52  self.assertEqual(subscription.queue_length, min_queue_length)
53  self.assertEqual(subscription.fragment_size, min_frag_size)
54  self.assertEqual(subscription.compression, "none")
55 
56  list(subscription.clients.values())[0]["compression"] = "png"
57 
58  subscription.update_params()
59 
60  self.assertEqual(subscription.throttle_rate, min_throttle_rate)
61  self.assertEqual(subscription.queue_length, min_queue_length)
62  self.assertEqual(subscription.fragment_size, min_frag_size)
63  self.assertEqual(subscription.compression, "png")
64  except:
65  subscription.unregister()
66  raise
67 
68  subscription.unregister()
69 
71  proto = Protocol("test_missing_arguments")
72  sub = subscribe.Subscribe(proto)
73  msg = {"op": "subscribe"}
74  self.assertRaises(MissingArgumentException, sub.subscribe, msg)
75 
77  proto = Protocol("test_invalid_arguments")
78  sub = subscribe.Subscribe(proto)
79 
80  msg = {"op": "subscribe", "topic": 3}
81  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
82 
83  msg = {"op": "subscribe", "topic": "/jon", "type": 3}
84  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
85 
86  msg = {"op": "subscribe", "topic": "/jon", "throttle_rate": "fast"}
87  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
88 
89  msg = {"op": "subscribe", "topic": "/jon", "fragment_size": "five cubits"}
90  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
91 
92  msg = {"op": "subscribe", "topic": "/jon", "queue_length": "long"}
93  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
94 
95  msg = {"op": "subscribe", "topic": "/jon", "compression": 9000}
96  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
97 
99  proto = Protocol("test_subscribe_works")
100  sub = subscribe.Subscribe(proto)
101  topic = "/test_subscribe_works"
102  msg = String()
103  msg.data = "test test_subscribe_works works"
104  msg_type = "std_msgs/String"
105 
106  received = {"msg": None}
107 
108  def send(outgoing, **kwargs):
109  received["msg"] = outgoing
110 
111  proto.send = send
112 
113  sub.subscribe(loads(dumps({"op": "subscribe", "topic": topic, "type": msg_type})))
114 
115  p = rospy.Publisher(topic, String, queue_size=5)
116  time.sleep(0.25)
117  p.publish(msg)
118 
119  time.sleep(0.25)
120  self.assertEqual(received["msg"]["msg"]["data"], msg.data)
121 
122 
123 if __name__ == '__main__':
124  rosunit.unitrun(PKG, NAME, TestSubscribe)
125 
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18