15 PKG =
'rosbridge_library'
16 NAME =
'test_publisher_manager'
24 manager.unregister_timeout = 1.0
27 return topicname
in dict(rospy.get_published_topics()).keys()
30 """ Register a publisher on a clean topic with a good msg type """
31 topic =
"/test_register_publisher"
32 msg_type =
"std_msgs/String"
33 client =
"client_test_register_publisher"
35 self.assertFalse(topic
in manager._publishers)
37 manager.register(client, topic, msg_type)
38 self.assertTrue(topic
in manager._publishers)
41 manager.unregister(client, topic)
42 self.assertTrue(topic
in manager.unregister_timers)
43 self.assertTrue(topic
in manager._publishers)
45 sleep(manager.unregister_timeout*1.1)
46 self.assertFalse(topic
in manager._publishers)
48 self.assertFalse(topic
in manager.unregister_timers)
51 topic =
"/test_register_publisher_multiclient"
52 msg_type =
"std_msgs/String"
53 client1 =
"client_test_register_publisher_1"
54 client2 =
"client_test_register_publisher_2"
56 self.assertFalse(topic
in manager._publishers)
58 manager.register(client1, topic, msg_type)
59 self.assertTrue(topic
in manager._publishers)
61 manager.register(client2, topic, msg_type)
62 self.assertTrue(topic
in manager._publishers)
64 manager.unregister(client1, topic)
65 self.assertTrue(topic
in manager._publishers)
67 manager.unregister(client2, topic)
68 self.assertTrue(topic
in manager.unregister_timers)
69 self.assertTrue(topic
in manager._publishers)
71 sleep(manager.unregister_timeout*1.1)
72 self.assertFalse(topic
in manager._publishers)
74 self.assertFalse(topic
in manager.unregister_timers)
77 topic =
"/test_register_publisher_conflicting_types"
78 msg_type =
"std_msgs/String"
79 msg_type_bad =
"std_msgs/Int32"
80 client =
"client_test_register_publisher_conflicting_types"
82 self.assertFalse(topic
in manager._publishers)
84 manager.register(client, topic, msg_type)
85 self.assertTrue(topic
in manager._publishers)
88 self.assertRaises(TypeConflictException, manager.register,
"client2", topic, msg_type_bad)
91 topic1 =
"/test_register_multiple_publishers1"
92 topic2 =
"/test_register_multiple_publishers2"
93 msg_type =
"std_msgs/String"
94 client =
"client_test_register_multiple_publishers"
96 self.assertFalse(topic1
in manager._publishers)
97 self.assertFalse(topic2
in manager._publishers)
100 manager.register(client, topic1, msg_type)
101 self.assertTrue(topic1
in manager._publishers)
103 self.assertFalse(topic2
in manager._publishers)
105 manager.register(client, topic2, msg_type)
106 self.assertTrue(topic1
in manager._publishers)
108 self.assertTrue(topic2
in manager._publishers)
111 manager.unregister(client, topic1)
113 self.assertTrue(topic1
in manager.unregister_timers)
114 self.assertTrue(topic2
in manager._publishers)
117 manager.unregister(client, topic2)
118 self.assertTrue(topic2
in manager.unregister_timers)
120 sleep(manager.unregister_timeout*1.1)
121 self.assertFalse(topic1
in manager._publishers)
123 self.assertFalse(topic2
in manager._publishers)
125 self.assertFalse(topic1
in manager.unregister_timers)
126 self.assertFalse(topic2
in manager.unregister_timers)
129 topic =
"/test_register_no_msgtype"
130 client =
"client_test_register_no_msgtype"
132 self.assertFalse(topic
in manager._publishers)
134 self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
137 topic =
"/test_register_infer_topictype"
138 client =
"client_test_register_infer_topictype"
142 rospy.Publisher(topic, String)
145 self.assertFalse(topic
in manager._publishers)
146 manager.register(client, topic)
147 self.assertTrue(topic
in manager._publishers)
150 manager.unregister(client, topic)
151 self.assertTrue(topic
in manager.unregister_timers)
155 topic =
"/test_register_multiple_notopictype"
156 msg_type =
"std_msgs/String"
157 client1 =
"client_test_register_multiple_notopictype_1"
158 client2 =
"client_test_register_multiple_notopictype_2"
160 self.assertFalse(topic
in manager._publishers)
161 self.assertFalse(topic
in manager.unregister_timers)
163 manager.register(client1, topic, msg_type)
164 self.assertTrue(topic
in manager._publishers)
166 manager.register(client2, topic)
167 self.assertTrue(topic
in manager._publishers)
169 manager.unregister(client1, topic)
170 self.assertTrue(topic
in manager._publishers)
171 self.assertTrue(topic
in manager.unregister_timers)
173 manager.unregister(client2, topic)
174 self.assertTrue(topic
in manager.unregister_timers)
175 self.assertTrue(topic
in manager._publishers)
176 sleep(manager.unregister_timeout*1.1)
177 self.assertFalse(topic
in manager._publishers)
178 self.assertFalse(topic
in manager.unregister_timers)
182 topic =
"/test_publish_not_registered"
183 msg = {
"data":
"test publish not registered"}
184 client =
"client_test_publish_not_registered"
186 self.assertFalse(topic
in manager._publishers)
188 self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
191 """ Make sure that publishing works """
192 topic =
"/test_publisher_manager_publish"
193 msg = {
"data":
"test publisher manager publish"}
194 client =
"client_test_publisher_manager_publish"
196 received = {
"msg":
None}
199 received[
"msg"] = msg
201 rospy.Subscriber(topic, String, cb)
202 manager.publish(client, topic, msg)
205 self.assertEqual(received[
"msg"].data, msg[
"data"])
208 """ Make sure that bad publishing fails """
209 topic =
"/test_publisher_manager_bad_publish"
210 client =
"client_test_publisher_manager_bad_publish"
211 msg_type =
"std_msgs/String"
214 manager.register(client, topic, msg_type)
215 self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
218 if __name__ ==
'__main__':
219 rosunit.unitrun(PKG, NAME, TestPublisherManager)