15 PKG =
'rosbridge_library' 16 NAME =
'test_publisher_manager' 24 manager.unregister_timeout = 1.0
27 return topicname
in dict(rospy.get_published_topics()).keys()
30 """ Register a publisher on a clean topic with a good msg type """ 31 topic =
"/test_register_publisher" 32 msg_type =
"std_msgs/String" 33 client =
"client_test_register_publisher" 35 self.assertFalse(topic
in manager._publishers)
37 manager.register(client, topic, msg_type)
38 self.assertTrue(topic
in manager._publishers)
41 manager.unregister(client, topic)
42 self.assertTrue(topic
in manager.unregister_timers)
43 self.assertTrue(topic
in manager._publishers)
45 sleep(manager.unregister_timeout*1.1)
46 self.assertFalse(topic
in manager._publishers)
48 self.assertFalse(topic
in manager.unregister_timers)
51 topic =
"/test_register_publisher_multiclient" 52 msg_type =
"std_msgs/String" 53 client1 =
"client_test_register_publisher_1" 54 client2 =
"client_test_register_publisher_2" 56 self.assertFalse(topic
in manager._publishers)
58 manager.register(client1, topic, msg_type)
59 self.assertTrue(topic
in manager._publishers)
61 manager.register(client2, topic, msg_type)
62 self.assertTrue(topic
in manager._publishers)
64 manager.unregister(client1, topic)
65 self.assertTrue(topic
in manager._publishers)
67 manager.unregister(client2, topic)
68 self.assertTrue(topic
in manager.unregister_timers)
69 self.assertTrue(topic
in manager._publishers)
71 sleep(manager.unregister_timeout*1.1)
72 self.assertFalse(topic
in manager._publishers)
74 self.assertFalse(topic
in manager.unregister_timers)
77 topic =
"/test_register_publisher_conflicting_types" 78 msg_type =
"std_msgs/String" 79 msg_type_bad =
"std_msgs/Int32" 80 client =
"client_test_register_publisher_conflicting_types" 82 self.assertFalse(topic
in manager._publishers)
84 manager.register(client, topic, msg_type)
85 self.assertTrue(topic
in manager._publishers)
88 self.assertRaises(TypeConflictException, manager.register,
"client2", topic, msg_type_bad)
91 topic1 =
"/test_register_multiple_publishers1" 92 topic2 =
"/test_register_multiple_publishers2" 93 msg_type =
"std_msgs/String" 94 client =
"client_test_register_multiple_publishers" 96 self.assertFalse(topic1
in manager._publishers)
97 self.assertFalse(topic2
in manager._publishers)
100 manager.register(client, topic1, msg_type)
101 self.assertTrue(topic1
in manager._publishers)
103 self.assertFalse(topic2
in manager._publishers)
105 manager.register(client, topic2, msg_type)
106 self.assertTrue(topic1
in manager._publishers)
108 self.assertTrue(topic2
in manager._publishers)
111 manager.unregister(client, topic1)
113 self.assertTrue(topic1
in manager.unregister_timers)
114 self.assertTrue(topic2
in manager._publishers)
117 manager.unregister(client, topic2)
118 self.assertTrue(topic2
in manager.unregister_timers)
120 sleep(manager.unregister_timeout*1.1)
121 self.assertFalse(topic1
in manager._publishers)
123 self.assertFalse(topic2
in manager._publishers)
125 self.assertFalse(topic1
in manager.unregister_timers)
126 self.assertFalse(topic2
in manager.unregister_timers)
129 topic =
"/test_register_no_msgtype" 130 client =
"client_test_register_no_msgtype" 132 self.assertFalse(topic
in manager._publishers)
134 self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
137 topic =
"/test_register_infer_topictype" 138 client =
"client_test_register_infer_topictype" 142 rospy.Publisher(topic, String)
145 self.assertFalse(topic
in manager._publishers)
146 manager.register(client, topic)
147 self.assertTrue(topic
in manager._publishers)
150 manager.unregister(client, topic)
151 self.assertTrue(topic
in manager.unregister_timers)
155 topic =
"/test_register_multiple_notopictype" 156 msg_type =
"std_msgs/String" 157 client1 =
"client_test_register_multiple_notopictype_1" 158 client2 =
"client_test_register_multiple_notopictype_2" 160 self.assertFalse(topic
in manager._publishers)
161 self.assertFalse(topic
in manager.unregister_timers)
163 manager.register(client1, topic, msg_type)
164 self.assertTrue(topic
in manager._publishers)
166 manager.register(client2, topic)
167 self.assertTrue(topic
in manager._publishers)
169 manager.unregister(client1, topic)
170 self.assertTrue(topic
in manager._publishers)
171 self.assertTrue(topic
in manager.unregister_timers)
173 manager.unregister(client2, topic)
174 self.assertTrue(topic
in manager.unregister_timers)
175 self.assertTrue(topic
in manager._publishers)
176 sleep(manager.unregister_timeout*1.1)
177 self.assertFalse(topic
in manager._publishers)
178 self.assertFalse(topic
in manager.unregister_timers)
182 topic =
"/test_publish_not_registered" 183 msg = {
"data":
"test publish not registered"}
184 client =
"client_test_publish_not_registered" 186 self.assertFalse(topic
in manager._publishers)
188 self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
191 """ Make sure that publishing works """ 192 topic =
"/test_publisher_manager_publish" 193 msg = {
"data":
"test publisher manager publish"}
194 client =
"client_test_publisher_manager_publish" 196 received = {
"msg":
None}
199 received[
"msg"] = msg
201 rospy.Subscriber(topic, String, cb)
202 manager.publish(client, topic, msg)
205 self.assertEqual(received[
"msg"].data, msg[
"data"])
208 """ Make sure that bad publishing fails """ 209 topic =
"/test_publisher_manager_bad_publish" 210 client =
"client_test_publisher_manager_bad_publish" 211 msg_type =
"std_msgs/String" 214 manager.register(client, topic, msg_type)
215 self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
218 if __name__ ==
'__main__':
219 rosunit.unitrun(PKG, NAME, TestPublisherManager)
def test_publisher_manager_publish(self)
def test_register_multiple_publishers(self)
def test_publisher_manager_bad_publish(self)
def test_register_publisher_multiclient(self)
def test_register_publisher(self)
def test_register_no_msgtype(self)
def test_register_publisher_conflicting_types(self)
def test_publish_not_registered(self)
def test_register_multiple_notopictype(self)
def is_topic_published(self, topicname)
def test_register_infer_topictype(self)