6 from rosgraph
import Master
19 rospy.init_node(
"test_subscriber_manager")
22 return topicname
in dict(rospy.get_published_topics()).keys()
25 return topicname
in dict(Master(
"test_subscriber_manager").getSystemState()[1])
28 """ Register a publisher on a clean topic with a good msg type """ 29 topic =
"/test_subscribe" 30 msg_type =
"std_msgs/String" 31 client =
"client_test_subscribe" 33 self.assertFalse(topic
in manager._subscribers)
35 manager.subscribe(client, topic,
None, msg_type)
36 self.assertTrue(topic
in manager._subscribers)
39 manager.unsubscribe(client, topic)
40 self.assertFalse(topic
in manager._subscribers)
44 topic =
"/test_register_subscriber_multiclient" 45 msg_type =
"std_msgs/String" 46 client1 =
"client_test_register_subscriber_multiclient_1" 47 client2 =
"client_test_register_subscriber_multiclient_2" 49 self.assertFalse(topic
in manager._subscribers)
51 manager.subscribe(client1, topic,
None, msg_type)
52 self.assertTrue(topic
in manager._subscribers)
54 manager.subscribe(client2, topic,
None, msg_type)
55 self.assertTrue(topic
in manager._subscribers)
57 manager.unsubscribe(client1, topic)
58 self.assertTrue(topic
in manager._subscribers)
60 manager.unsubscribe(client2, topic)
61 self.assertFalse(topic
in manager._subscribers)
65 topic =
"/test_register_publisher_conflicting_types" 66 msg_type =
"std_msgs/String" 67 msg_type_bad =
"std_msgs/Int32" 68 client =
"client_test_register_publisher_conflicting_types" 70 self.assertFalse(topic
in manager._subscribers)
72 manager.subscribe(client, topic,
None, msg_type)
73 self.assertTrue(topic
in manager._subscribers)
76 self.assertRaises(TypeConflictException, manager.subscribe,
"client2", topic,
None, msg_type_bad)
79 topic1 =
"/test_register_multiple_publishers1" 80 topic2 =
"/test_register_multiple_publishers2" 81 msg_type =
"std_msgs/String" 82 client =
"client_test_register_multiple_publishers" 84 self.assertFalse(topic1
in manager._subscribers)
85 self.assertFalse(topic2
in manager._subscribers)
88 manager.subscribe(client, topic1,
None, msg_type)
89 self.assertTrue(topic1
in manager._subscribers)
91 self.assertFalse(topic2
in manager._subscribers)
93 manager.subscribe(client, topic2,
None, msg_type)
94 self.assertTrue(topic1
in manager._subscribers)
96 self.assertTrue(topic2
in manager._subscribers)
99 manager.unsubscribe(client, topic1)
100 self.assertFalse(topic1
in manager._subscribers)
102 self.assertTrue(topic2
in manager._subscribers)
105 manager.unsubscribe(client, topic2)
106 self.assertFalse(topic1
in manager._subscribers)
108 self.assertFalse(topic2
in manager._subscribers)
112 topic =
"/test_register_no_msgtype" 113 client =
"client_test_register_no_msgtype" 115 self.assertFalse(topic
in manager._subscribers)
117 self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic,
None)
120 topic =
"/test_register_infer_topictype" 121 client =
"client_test_register_infer_topictype" 125 rospy.Subscriber(topic, String,
None)
128 self.assertFalse(topic
in manager._subscribers)
129 manager.subscribe(client, topic,
None)
130 self.assertTrue(topic
in manager._subscribers)
133 manager.unsubscribe(client, topic)
134 self.assertFalse(topic
in manager._subscribers)
138 topic =
"/test_register_multiple_notopictype" 139 msg_type =
"std_msgs/String" 140 client1 =
"client_test_register_multiple_notopictype_1" 141 client2 =
"client_test_register_multiple_notopictype_2" 143 self.assertFalse(topic
in manager._subscribers)
145 manager.subscribe(client1, topic,
None, msg_type)
146 self.assertTrue(topic
in manager._subscribers)
148 manager.subscribe(client2, topic,
None)
149 self.assertTrue(topic
in manager._subscribers)
151 manager.unsubscribe(client1, topic)
152 self.assertTrue(topic
in manager._subscribers)
154 manager.unsubscribe(client2, topic)
155 self.assertFalse(topic
in manager._subscribers)
159 topic =
"/test_subscribe_not_registered" 160 client =
"client_test_subscribe_not_registered" 162 self.assertFalse(topic
in manager._subscribers)
164 self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic,
None)
167 topic =
"/test_publisher_manager_publish" 168 msg_type =
"std_msgs/String" 169 client =
"client_test_publisher_manager_publish" 172 msg.data =
"dsajfadsufasdjf" 174 pub = rospy.Publisher(topic, String)
175 received = {
"msg":
None}
178 received[
"msg"] = msg.get_json_values()
180 manager.subscribe(client, topic, cb, msg_type)
184 self.assertEqual(msg.data, received[
"msg"][
"data"])
187 PKG =
'rosbridge_library' 188 NAME =
'test_subscriber_manager' 189 if __name__ ==
'__main__':
190 rostest.unitrun(PKG, NAME, TestSubscriberManager)
def test_register_no_msgtype(self)
def test_register_infer_topictype(self)
def test_register_multiple_publishers(self)
def test_register_multiple_notopictype(self)
def test_register_publisher_conflicting_types(self)
def is_topic_subscribed(self, topicname)
def is_topic_published(self, topicname)
def test_register_subscriber_multiclient(self)
def test_publisher_manager_publish(self)
def test_subscribe_not_registered(self)