test_publisher_manager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 
7 from time import sleep
8 
11 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
12 from std_msgs.msg import String, Int32
13 
14 
15 PKG = 'rosbridge_library'
16 NAME = 'test_publisher_manager'
17 
18 
19 class TestPublisherManager(unittest.TestCase):
20 
21  def setUp(self):
22  rospy.init_node(NAME)
23 
24  manager.unregister_timeout = 1.0
25 
26  def is_topic_published(self, topicname):
27  return topicname in dict(rospy.get_published_topics()).keys()
28 
30  """ Register a publisher on a clean topic with a good msg type """
31  topic = "/test_register_publisher"
32  msg_type = "std_msgs/String"
33  client = "client_test_register_publisher"
34 
35  self.assertFalse(topic in manager._publishers)
36  self.assertFalse(self.is_topic_published(topic))
37  manager.register(client, topic, msg_type)
38  self.assertTrue(topic in manager._publishers)
39  self.assertTrue(self.is_topic_published(topic))
40 
41  manager.unregister(client, topic)
42  self.assertTrue(topic in manager.unregister_timers)
43  self.assertTrue(topic in manager._publishers)
44  self.assertTrue(self.is_topic_published(topic))
45  sleep(manager.unregister_timeout*1.1)
46  self.assertFalse(topic in manager._publishers)
47  self.assertFalse(self.is_topic_published(topic))
48  self.assertFalse(topic in manager.unregister_timers)
49 
51  topic = "/test_register_publisher_multiclient"
52  msg_type = "std_msgs/String"
53  client1 = "client_test_register_publisher_1"
54  client2 = "client_test_register_publisher_2"
55 
56  self.assertFalse(topic in manager._publishers)
57  self.assertFalse(self.is_topic_published(topic))
58  manager.register(client1, topic, msg_type)
59  self.assertTrue(topic in manager._publishers)
60  self.assertTrue(self.is_topic_published(topic))
61  manager.register(client2, topic, msg_type)
62  self.assertTrue(topic in manager._publishers)
63  self.assertTrue(self.is_topic_published(topic))
64  manager.unregister(client1, topic)
65  self.assertTrue(topic in manager._publishers)
66  self.assertTrue(self.is_topic_published(topic))
67  manager.unregister(client2, topic)
68  self.assertTrue(topic in manager.unregister_timers)
69  self.assertTrue(topic in manager._publishers)
70  self.assertTrue(self.is_topic_published(topic))
71  sleep(manager.unregister_timeout*1.1)
72  self.assertFalse(topic in manager._publishers)
73  self.assertFalse(self.is_topic_published(topic))
74  self.assertFalse(topic in manager.unregister_timers)
75 
77  topic = "/test_register_publisher_conflicting_types"
78  msg_type = "std_msgs/String"
79  msg_type_bad = "std_msgs/Int32"
80  client = "client_test_register_publisher_conflicting_types"
81 
82  self.assertFalse(topic in manager._publishers)
83  self.assertFalse(self.is_topic_published(topic))
84  manager.register(client, topic, msg_type)
85  self.assertTrue(topic in manager._publishers)
86  self.assertTrue(self.is_topic_published(topic))
87 
88  self.assertRaises(TypeConflictException, manager.register, "client2", topic, msg_type_bad)
89 
91  topic1 = "/test_register_multiple_publishers1"
92  topic2 = "/test_register_multiple_publishers2"
93  msg_type = "std_msgs/String"
94  client = "client_test_register_multiple_publishers"
95 
96  self.assertFalse(topic1 in manager._publishers)
97  self.assertFalse(topic2 in manager._publishers)
98  self.assertFalse(self.is_topic_published(topic1))
99  self.assertFalse(self.is_topic_published(topic2))
100  manager.register(client, topic1, msg_type)
101  self.assertTrue(topic1 in manager._publishers)
102  self.assertTrue(self.is_topic_published(topic1))
103  self.assertFalse(topic2 in manager._publishers)
104  self.assertFalse(self.is_topic_published(topic2))
105  manager.register(client, topic2, msg_type)
106  self.assertTrue(topic1 in manager._publishers)
107  self.assertTrue(self.is_topic_published(topic1))
108  self.assertTrue(topic2 in manager._publishers)
109  self.assertTrue(self.is_topic_published(topic2))
110 
111  manager.unregister(client, topic1)
112  self.assertTrue(self.is_topic_published(topic1))
113  self.assertTrue(topic1 in manager.unregister_timers)
114  self.assertTrue(topic2 in manager._publishers)
115  self.assertTrue(self.is_topic_published(topic2))
116 
117  manager.unregister(client, topic2)
118  self.assertTrue(topic2 in manager.unregister_timers)
119  self.assertTrue(self.is_topic_published(topic2))
120  sleep(manager.unregister_timeout*1.1)
121  self.assertFalse(topic1 in manager._publishers)
122  self.assertFalse(self.is_topic_published(topic1))
123  self.assertFalse(topic2 in manager._publishers)
124  self.assertFalse(self.is_topic_published(topic2))
125  self.assertFalse(topic1 in manager.unregister_timers)
126  self.assertFalse(topic2 in manager.unregister_timers)
127 
129  topic = "/test_register_no_msgtype"
130  client = "client_test_register_no_msgtype"
131 
132  self.assertFalse(topic in manager._publishers)
133  self.assertFalse(self.is_topic_published(topic))
134  self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
135 
137  topic = "/test_register_infer_topictype"
138  client = "client_test_register_infer_topictype"
139 
140  self.assertFalse(self.is_topic_published(topic))
141 
142  rospy.Publisher(topic, String)
143 
144  self.assertTrue(self.is_topic_published(topic))
145  self.assertFalse(topic in manager._publishers)
146  manager.register(client, topic)
147  self.assertTrue(topic in manager._publishers)
148  self.assertTrue(self.is_topic_published(topic))
149 
150  manager.unregister(client, topic)
151  self.assertTrue(topic in manager.unregister_timers)
152  self.assertTrue(self.is_topic_published(topic))
153 
155  topic = "/test_register_multiple_notopictype"
156  msg_type = "std_msgs/String"
157  client1 = "client_test_register_multiple_notopictype_1"
158  client2 = "client_test_register_multiple_notopictype_2"
159 
160  self.assertFalse(topic in manager._publishers)
161  self.assertFalse(topic in manager.unregister_timers)
162  self.assertFalse(self.is_topic_published(topic))
163  manager.register(client1, topic, msg_type)
164  self.assertTrue(topic in manager._publishers)
165  self.assertTrue(self.is_topic_published(topic))
166  manager.register(client2, topic)
167  self.assertTrue(topic in manager._publishers)
168  self.assertTrue(self.is_topic_published(topic))
169  manager.unregister(client1, topic)
170  self.assertTrue(topic in manager._publishers)
171  self.assertTrue(topic in manager.unregister_timers)
172  self.assertTrue(self.is_topic_published(topic))
173  manager.unregister(client2, topic)
174  self.assertTrue(topic in manager.unregister_timers)
175  self.assertTrue(topic in manager._publishers)
176  sleep(manager.unregister_timeout*1.1)
177  self.assertFalse(topic in manager._publishers)
178  self.assertFalse(topic in manager.unregister_timers)
179  self.assertFalse(self.is_topic_published(topic))
180 
182  topic = "/test_publish_not_registered"
183  msg = {"data": "test publish not registered"}
184  client = "client_test_publish_not_registered"
185 
186  self.assertFalse(topic in manager._publishers)
187  self.assertFalse(self.is_topic_published(topic))
188  self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
189 
191  """ Make sure that publishing works """
192  topic = "/test_publisher_manager_publish"
193  msg = {"data": "test publisher manager publish"}
194  client = "client_test_publisher_manager_publish"
195 
196  received = {"msg": None}
197 
198  def cb(msg):
199  received["msg"] = msg
200 
201  rospy.Subscriber(topic, String, cb)
202  manager.publish(client, topic, msg)
203  sleep(0.5)
204 
205  self.assertEqual(received["msg"].data, msg["data"])
206 
208  """ Make sure that bad publishing fails """
209  topic = "/test_publisher_manager_bad_publish"
210  client = "client_test_publisher_manager_bad_publish"
211  msg_type = "std_msgs/String"
212  msg = {"data": 3}
213 
214  manager.register(client, topic, msg_type)
215  self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
216 
217 
218 if __name__ == '__main__':
219  rosunit.unitrun(PKG, NAME, TestPublisherManager)
220 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18