18 rospy.init_node(
"test_publisher_manager")
19 manager.unregister_timeout = 1.0
22 return topicname
in dict(rospy.get_published_topics()).keys()
25 """ Register a publisher on a clean topic with a good msg type """ 26 topic =
"/test_register_publisher" 27 msg_type =
"std_msgs/String" 28 client =
"client_test_register_publisher" 30 self.assertFalse(topic
in manager._publishers)
32 manager.register(client, topic, msg_type)
33 self.assertTrue(topic
in manager._publishers)
36 manager.unregister(client, topic)
37 self.assertTrue(topic
in manager.unregister_timers)
38 self.assertTrue(topic
in manager._publishers)
40 sleep(manager.unregister_timeout*1.1)
41 self.assertFalse(topic
in manager._publishers)
43 self.assertFalse(topic
in manager.unregister_timers)
46 topic =
"/test_register_publisher_multiclient" 47 msg_type =
"std_msgs/String" 48 client1 =
"client_test_register_publisher_1" 49 client2 =
"client_test_register_publisher_2" 51 self.assertFalse(topic
in manager._publishers)
53 manager.register(client1, topic, msg_type)
54 self.assertTrue(topic
in manager._publishers)
56 manager.register(client2, topic, msg_type)
57 self.assertTrue(topic
in manager._publishers)
59 manager.unregister(client1, topic)
60 self.assertTrue(topic
in manager._publishers)
62 manager.unregister(client2, topic)
63 self.assertTrue(topic
in manager.unregister_timers)
64 self.assertTrue(topic
in manager._publishers)
66 sleep(manager.unregister_timeout*1.1)
67 self.assertFalse(topic
in manager._publishers)
69 self.assertFalse(topic
in manager.unregister_timers)
72 topic =
"/test_register_publisher_conflicting_types" 73 msg_type =
"std_msgs/String" 74 msg_type_bad =
"std_msgs/Int32" 75 client =
"client_test_register_publisher_conflicting_types" 77 self.assertFalse(topic
in manager._publishers)
79 manager.register(client, topic, msg_type)
80 self.assertTrue(topic
in manager._publishers)
83 self.assertRaises(TypeConflictException, manager.register,
"client2", topic, msg_type_bad)
86 topic1 =
"/test_register_multiple_publishers1" 87 topic2 =
"/test_register_multiple_publishers2" 88 msg_type =
"std_msgs/String" 89 client =
"client_test_register_multiple_publishers" 91 self.assertFalse(topic1
in manager._publishers)
92 self.assertFalse(topic2
in manager._publishers)
95 manager.register(client, topic1, msg_type)
96 self.assertTrue(topic1
in manager._publishers)
98 self.assertFalse(topic2
in manager._publishers)
100 manager.register(client, topic2, msg_type)
101 self.assertTrue(topic1
in manager._publishers)
103 self.assertTrue(topic2
in manager._publishers)
106 manager.unregister(client, topic1)
108 self.assertTrue(topic1
in manager.unregister_timers)
109 self.assertTrue(topic2
in manager._publishers)
112 manager.unregister(client, topic2)
113 self.assertTrue(topic2
in manager.unregister_timers)
115 sleep(manager.unregister_timeout*1.1)
116 self.assertFalse(topic1
in manager._publishers)
118 self.assertFalse(topic2
in manager._publishers)
120 self.assertFalse(topic1
in manager.unregister_timers)
121 self.assertFalse(topic2
in manager.unregister_timers)
124 topic =
"/test_register_no_msgtype" 125 client =
"client_test_register_no_msgtype" 127 self.assertFalse(topic
in manager._publishers)
129 self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
132 topic =
"/test_register_infer_topictype" 133 client =
"client_test_register_infer_topictype" 137 rospy.Publisher(topic, String)
140 self.assertFalse(topic
in manager._publishers)
141 manager.register(client, topic)
142 self.assertTrue(topic
in manager._publishers)
145 manager.unregister(client, topic)
146 self.assertTrue(topic
in manager.unregister_timers)
150 topic =
"/test_register_multiple_notopictype" 151 msg_type =
"std_msgs/String" 152 client1 =
"client_test_register_multiple_notopictype_1" 153 client2 =
"client_test_register_multiple_notopictype_2" 155 self.assertFalse(topic
in manager._publishers)
156 self.assertFalse(topic
in manager.unregister_timers)
158 manager.register(client1, topic, msg_type)
159 self.assertTrue(topic
in manager._publishers)
161 manager.register(client2, topic)
162 self.assertTrue(topic
in manager._publishers)
164 manager.unregister(client1, topic)
165 self.assertTrue(topic
in manager._publishers)
166 self.assertTrue(topic
in manager.unregister_timers)
168 manager.unregister(client2, topic)
169 self.assertTrue(topic
in manager.unregister_timers)
170 self.assertTrue(topic
in manager._publishers)
171 sleep(manager.unregister_timeout*1.1)
172 self.assertFalse(topic
in manager._publishers)
173 self.assertFalse(topic
in manager.unregister_timers)
177 topic =
"/test_publish_not_registered" 178 msg = {
"data":
"test publish not registered"}
179 client =
"client_test_publish_not_registered" 181 self.assertFalse(topic
in manager._publishers)
183 self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
186 """ Make sure that publishing works """ 187 topic =
"/test_publisher_manager_publish" 188 msg = {
"data":
"test publisher manager publish"}
189 client =
"client_test_publisher_manager_publish" 191 received = {
"msg":
None}
194 received[
"msg"] = msg
196 rospy.Subscriber(topic, String, cb)
197 manager.publish(client, topic, msg)
200 self.assertEqual(received[
"msg"].data, msg[
"data"])
203 """ Make sure that bad publishing fails """ 204 topic =
"/test_publisher_manager_bad_publish" 205 client =
"client_test_publisher_manager_bad_publish" 206 msg_type =
"std_msgs/String" 209 manager.register(client, topic, msg_type)
210 self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
213 PKG =
'rosbridge_library' 214 NAME =
'test_publisher_manager' 215 if __name__ ==
'__main__':
216 rostest.unitrun(PKG, NAME, TestPublisherManager)
def test_publisher_manager_publish(self)
def test_register_multiple_publishers(self)
def test_publisher_manager_bad_publish(self)
def test_register_publisher_multiclient(self)
def test_register_publisher(self)
def test_register_no_msgtype(self)
def test_register_publisher_conflicting_types(self)
def test_publish_not_registered(self)
def test_register_multiple_notopictype(self)
def is_topic_published(self, topicname)
def test_register_infer_topictype(self)