test_publisher_manager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 
7 from time import sleep
8 
11 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
12 from std_msgs.msg import String, Int32
13 
14 
15 class TestPublisherManager(unittest.TestCase):
16 
17  def setUp(self):
18  rospy.init_node("test_publisher_manager")
19  manager.unregister_timeout = 1.0
20 
21  def is_topic_published(self, topicname):
22  return topicname in dict(rospy.get_published_topics()).keys()
23 
25  """ Register a publisher on a clean topic with a good msg type """
26  topic = "/test_register_publisher"
27  msg_type = "std_msgs/String"
28  client = "client_test_register_publisher"
29 
30  self.assertFalse(topic in manager._publishers)
31  self.assertFalse(self.is_topic_published(topic))
32  manager.register(client, topic, msg_type)
33  self.assertTrue(topic in manager._publishers)
34  self.assertTrue(self.is_topic_published(topic))
35 
36  manager.unregister(client, topic)
37  self.assertTrue(topic in manager.unregister_timers)
38  self.assertTrue(topic in manager._publishers)
39  self.assertTrue(self.is_topic_published(topic))
40  sleep(manager.unregister_timeout*1.1)
41  self.assertFalse(topic in manager._publishers)
42  self.assertFalse(self.is_topic_published(topic))
43  self.assertFalse(topic in manager.unregister_timers)
44 
46  topic = "/test_register_publisher_multiclient"
47  msg_type = "std_msgs/String"
48  client1 = "client_test_register_publisher_1"
49  client2 = "client_test_register_publisher_2"
50 
51  self.assertFalse(topic in manager._publishers)
52  self.assertFalse(self.is_topic_published(topic))
53  manager.register(client1, topic, msg_type)
54  self.assertTrue(topic in manager._publishers)
55  self.assertTrue(self.is_topic_published(topic))
56  manager.register(client2, topic, msg_type)
57  self.assertTrue(topic in manager._publishers)
58  self.assertTrue(self.is_topic_published(topic))
59  manager.unregister(client1, topic)
60  self.assertTrue(topic in manager._publishers)
61  self.assertTrue(self.is_topic_published(topic))
62  manager.unregister(client2, topic)
63  self.assertTrue(topic in manager.unregister_timers)
64  self.assertTrue(topic in manager._publishers)
65  self.assertTrue(self.is_topic_published(topic))
66  sleep(manager.unregister_timeout*1.1)
67  self.assertFalse(topic in manager._publishers)
68  self.assertFalse(self.is_topic_published(topic))
69  self.assertFalse(topic in manager.unregister_timers)
70 
72  topic = "/test_register_publisher_conflicting_types"
73  msg_type = "std_msgs/String"
74  msg_type_bad = "std_msgs/Int32"
75  client = "client_test_register_publisher_conflicting_types"
76 
77  self.assertFalse(topic in manager._publishers)
78  self.assertFalse(self.is_topic_published(topic))
79  manager.register(client, topic, msg_type)
80  self.assertTrue(topic in manager._publishers)
81  self.assertTrue(self.is_topic_published(topic))
82 
83  self.assertRaises(TypeConflictException, manager.register, "client2", topic, msg_type_bad)
84 
86  topic1 = "/test_register_multiple_publishers1"
87  topic2 = "/test_register_multiple_publishers2"
88  msg_type = "std_msgs/String"
89  client = "client_test_register_multiple_publishers"
90 
91  self.assertFalse(topic1 in manager._publishers)
92  self.assertFalse(topic2 in manager._publishers)
93  self.assertFalse(self.is_topic_published(topic1))
94  self.assertFalse(self.is_topic_published(topic2))
95  manager.register(client, topic1, msg_type)
96  self.assertTrue(topic1 in manager._publishers)
97  self.assertTrue(self.is_topic_published(topic1))
98  self.assertFalse(topic2 in manager._publishers)
99  self.assertFalse(self.is_topic_published(topic2))
100  manager.register(client, topic2, msg_type)
101  self.assertTrue(topic1 in manager._publishers)
102  self.assertTrue(self.is_topic_published(topic1))
103  self.assertTrue(topic2 in manager._publishers)
104  self.assertTrue(self.is_topic_published(topic2))
105 
106  manager.unregister(client, topic1)
107  self.assertTrue(self.is_topic_published(topic1))
108  self.assertTrue(topic1 in manager.unregister_timers)
109  self.assertTrue(topic2 in manager._publishers)
110  self.assertTrue(self.is_topic_published(topic2))
111 
112  manager.unregister(client, topic2)
113  self.assertTrue(topic2 in manager.unregister_timers)
114  self.assertTrue(self.is_topic_published(topic2))
115  sleep(manager.unregister_timeout*1.1)
116  self.assertFalse(topic1 in manager._publishers)
117  self.assertFalse(self.is_topic_published(topic1))
118  self.assertFalse(topic2 in manager._publishers)
119  self.assertFalse(self.is_topic_published(topic2))
120  self.assertFalse(topic1 in manager.unregister_timers)
121  self.assertFalse(topic2 in manager.unregister_timers)
122 
124  topic = "/test_register_no_msgtype"
125  client = "client_test_register_no_msgtype"
126 
127  self.assertFalse(topic in manager._publishers)
128  self.assertFalse(self.is_topic_published(topic))
129  self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
130 
132  topic = "/test_register_infer_topictype"
133  client = "client_test_register_infer_topictype"
134 
135  self.assertFalse(self.is_topic_published(topic))
136 
137  rospy.Publisher(topic, String)
138 
139  self.assertTrue(self.is_topic_published(topic))
140  self.assertFalse(topic in manager._publishers)
141  manager.register(client, topic)
142  self.assertTrue(topic in manager._publishers)
143  self.assertTrue(self.is_topic_published(topic))
144 
145  manager.unregister(client, topic)
146  self.assertTrue(topic in manager.unregister_timers)
147  self.assertTrue(self.is_topic_published(topic))
148 
150  topic = "/test_register_multiple_notopictype"
151  msg_type = "std_msgs/String"
152  client1 = "client_test_register_multiple_notopictype_1"
153  client2 = "client_test_register_multiple_notopictype_2"
154 
155  self.assertFalse(topic in manager._publishers)
156  self.assertFalse(topic in manager.unregister_timers)
157  self.assertFalse(self.is_topic_published(topic))
158  manager.register(client1, topic, msg_type)
159  self.assertTrue(topic in manager._publishers)
160  self.assertTrue(self.is_topic_published(topic))
161  manager.register(client2, topic)
162  self.assertTrue(topic in manager._publishers)
163  self.assertTrue(self.is_topic_published(topic))
164  manager.unregister(client1, topic)
165  self.assertTrue(topic in manager._publishers)
166  self.assertTrue(topic in manager.unregister_timers)
167  self.assertTrue(self.is_topic_published(topic))
168  manager.unregister(client2, topic)
169  self.assertTrue(topic in manager.unregister_timers)
170  self.assertTrue(topic in manager._publishers)
171  sleep(manager.unregister_timeout*1.1)
172  self.assertFalse(topic in manager._publishers)
173  self.assertFalse(topic in manager.unregister_timers)
174  self.assertFalse(self.is_topic_published(topic))
175 
177  topic = "/test_publish_not_registered"
178  msg = {"data": "test publish not registered"}
179  client = "client_test_publish_not_registered"
180 
181  self.assertFalse(topic in manager._publishers)
182  self.assertFalse(self.is_topic_published(topic))
183  self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
184 
186  """ Make sure that publishing works """
187  topic = "/test_publisher_manager_publish"
188  msg = {"data": "test publisher manager publish"}
189  client = "client_test_publisher_manager_publish"
190 
191  received = {"msg": None}
192 
193  def cb(msg):
194  received["msg"] = msg
195 
196  rospy.Subscriber(topic, String, cb)
197  manager.publish(client, topic, msg)
198  sleep(0.5)
199 
200  self.assertEqual(received["msg"].data, msg["data"])
201 
203  """ Make sure that bad publishing fails """
204  topic = "/test_publisher_manager_bad_publish"
205  client = "client_test_publisher_manager_bad_publish"
206  msg_type = "std_msgs/String"
207  msg = {"data": 3}
208 
209  manager.register(client, topic, msg_type)
210  self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
211 
212 
213 PKG = 'rosbridge_library'
214 NAME = 'test_publisher_manager'
215 if __name__ == '__main__':
216  rostest.unitrun(PKG, NAME, TestPublisherManager)
217 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14