test_subscribe.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 import time
7 
8 from json import loads, dumps
9 from std_msgs.msg import String
10 
11 from rosbridge_library.capabilities import subscribe
12 from rosbridge_library.protocol import Protocol
13 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
14 
15 
16 class TestSubscribe(unittest.TestCase):
17 
18  def setUp(self):
19  rospy.init_node("test_subscribe")
20 
21  def dummy_cb(self, msg):
22  pass
23 
24  def test_update_params(self):
25  """ Adds a bunch of random clients to the subscription and sees whether
26  the correct parameters are chosen as the min """
27  client_id = "client_test_update_params"
28  topic = "/test_update_params"
29  msg_type = "std_msgs/String"
30 
31  subscription = subscribe.Subscription(client_id, topic, None)
32 
33  min_throttle_rate = 5
34  min_queue_length = 2
35  min_frag_size = 20
36 
37  for throttle_rate in range(min_throttle_rate, min_throttle_rate + 10):
38  for queue_length in range(min_queue_length, min_queue_length + 10):
39  for frag_size in range(min_frag_size, min_frag_size + 10):
40  sid = throttle_rate * 100 + queue_length * 10 + frag_size
41  subscription.subscribe(sid, msg_type, throttle_rate,
42  queue_length, frag_size)
43 
44  subscription.update_params()
45 
46  try:
47  self.assertEqual(subscription.throttle_rate, min_throttle_rate)
48  self.assertEqual(subscription.queue_length, min_queue_length)
49  self.assertEqual(subscription.fragment_size, min_frag_size)
50  self.assertEqual(subscription.compression, "none")
51 
52  list(subscription.clients.values())[0]["compression"] = "png"
53 
54  subscription.update_params()
55 
56  self.assertEqual(subscription.throttle_rate, min_throttle_rate)
57  self.assertEqual(subscription.queue_length, min_queue_length)
58  self.assertEqual(subscription.fragment_size, min_frag_size)
59  self.assertEqual(subscription.compression, "png")
60  except:
61  subscription.unregister()
62  raise
63 
64  subscription.unregister()
65 
67  proto = Protocol("test_missing_arguments")
68  sub = subscribe.Subscribe(proto)
69  msg = {"op": "subscribe"}
70  self.assertRaises(MissingArgumentException, sub.subscribe, msg)
71 
73  proto = Protocol("test_invalid_arguments")
74  sub = subscribe.Subscribe(proto)
75 
76  msg = {"op": "subscribe", "topic": 3}
77  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
78 
79  msg = {"op": "subscribe", "topic": "/jon", "type": 3}
80  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
81 
82  msg = {"op": "subscribe", "topic": "/jon", "throttle_rate": "fast"}
83  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
84 
85  msg = {"op": "subscribe", "topic": "/jon", "fragment_size": "five cubits"}
86  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
87 
88  msg = {"op": "subscribe", "topic": "/jon", "queue_length": "long"}
89  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
90 
91  msg = {"op": "subscribe", "topic": "/jon", "compression": 9000}
92  self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
93 
95  proto = Protocol("test_subscribe_works")
96  sub = subscribe.Subscribe(proto)
97  topic = "/test_subscribe_works"
98  msg = String()
99  msg.data = "test test_subscribe_works works"
100  msg_type = "std_msgs/String"
101 
102  received = {"msg": None}
103 
104  def send(outgoing):
105  received["msg"] = outgoing
106 
107  proto.send = send
108 
109  sub.subscribe(loads(dumps({"op": "subscribe", "topic": topic, "type": msg_type})))
110 
111  p = rospy.Publisher(topic, String, queue_size=5)
112  time.sleep(0.25)
113  p.publish(msg)
114 
115  time.sleep(0.25)
116  self.assertEqual(received["msg"]["msg"]["data"], msg.data)
117 
118 
119 PKG = 'rosbridge_library'
120 NAME = 'test_subscribe'
121 if __name__ == '__main__':
122  rostest.unitrun(PKG, NAME, TestSubscribe)
123 
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14