6 from rosgraph
import Master
16 PKG =
'rosbridge_library'
17 NAME =
'test_subscriber_manager'
26 return topicname
in dict(rospy.get_published_topics()).keys()
29 return topicname
in dict(Master(
"test_subscriber_manager").getSystemState()[1])
32 """ Register a publisher on a clean topic with a good msg type """
33 topic =
"/test_subscribe"
34 msg_type =
"std_msgs/String"
35 client =
"client_test_subscribe"
37 self.assertFalse(topic
in manager._subscribers)
39 manager.subscribe(client, topic,
None, msg_type)
40 self.assertTrue(topic
in manager._subscribers)
43 manager.unsubscribe(client, topic)
44 self.assertFalse(topic
in manager._subscribers)
48 topic =
"/test_register_subscriber_multiclient"
49 msg_type =
"std_msgs/String"
50 client1 =
"client_test_register_subscriber_multiclient_1"
51 client2 =
"client_test_register_subscriber_multiclient_2"
53 self.assertFalse(topic
in manager._subscribers)
55 manager.subscribe(client1, topic,
None, msg_type)
56 self.assertTrue(topic
in manager._subscribers)
58 manager.subscribe(client2, topic,
None, msg_type)
59 self.assertTrue(topic
in manager._subscribers)
61 manager.unsubscribe(client1, topic)
62 self.assertTrue(topic
in manager._subscribers)
64 manager.unsubscribe(client2, topic)
65 self.assertFalse(topic
in manager._subscribers)
69 topic =
"/test_register_publisher_conflicting_types"
70 msg_type =
"std_msgs/String"
71 msg_type_bad =
"std_msgs/Int32"
72 client =
"client_test_register_publisher_conflicting_types"
74 self.assertFalse(topic
in manager._subscribers)
76 manager.subscribe(client, topic,
None, msg_type)
77 self.assertTrue(topic
in manager._subscribers)
80 self.assertRaises(TypeConflictException, manager.subscribe,
"client2", topic,
None, msg_type_bad)
83 topic1 =
"/test_register_multiple_publishers1"
84 topic2 =
"/test_register_multiple_publishers2"
85 msg_type =
"std_msgs/String"
86 client =
"client_test_register_multiple_publishers"
88 self.assertFalse(topic1
in manager._subscribers)
89 self.assertFalse(topic2
in manager._subscribers)
92 manager.subscribe(client, topic1,
None, msg_type)
93 self.assertTrue(topic1
in manager._subscribers)
95 self.assertFalse(topic2
in manager._subscribers)
97 manager.subscribe(client, topic2,
None, msg_type)
98 self.assertTrue(topic1
in manager._subscribers)
100 self.assertTrue(topic2
in manager._subscribers)
103 manager.unsubscribe(client, topic1)
104 self.assertFalse(topic1
in manager._subscribers)
106 self.assertTrue(topic2
in manager._subscribers)
109 manager.unsubscribe(client, topic2)
110 self.assertFalse(topic1
in manager._subscribers)
112 self.assertFalse(topic2
in manager._subscribers)
116 topic =
"/test_register_no_msgtype"
117 client =
"client_test_register_no_msgtype"
119 self.assertFalse(topic
in manager._subscribers)
121 self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic,
None)
124 topic =
"/test_register_infer_topictype"
125 client =
"client_test_register_infer_topictype"
129 rospy.Subscriber(topic, String,
None)
132 self.assertFalse(topic
in manager._subscribers)
133 manager.subscribe(client, topic,
None)
134 self.assertTrue(topic
in manager._subscribers)
137 manager.unsubscribe(client, topic)
138 self.assertFalse(topic
in manager._subscribers)
142 topic =
"/test_register_multiple_notopictype"
143 msg_type =
"std_msgs/String"
144 client1 =
"client_test_register_multiple_notopictype_1"
145 client2 =
"client_test_register_multiple_notopictype_2"
147 self.assertFalse(topic
in manager._subscribers)
149 manager.subscribe(client1, topic,
None, msg_type)
150 self.assertTrue(topic
in manager._subscribers)
152 manager.subscribe(client2, topic,
None)
153 self.assertTrue(topic
in manager._subscribers)
155 manager.unsubscribe(client1, topic)
156 self.assertTrue(topic
in manager._subscribers)
158 manager.unsubscribe(client2, topic)
159 self.assertFalse(topic
in manager._subscribers)
163 topic =
"/test_subscribe_not_registered"
164 client =
"client_test_subscribe_not_registered"
166 self.assertFalse(topic
in manager._subscribers)
168 self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic,
None)
171 topic =
"/test_publisher_manager_publish"
172 msg_type =
"std_msgs/String"
173 client =
"client_test_publisher_manager_publish"
176 msg.data =
"dsajfadsufasdjf"
178 pub = rospy.Publisher(topic, String)
179 received = {
"msg":
None}
182 received[
"msg"] = msg.get_json_values()
184 manager.subscribe(client, topic, cb, msg_type)
188 self.assertEqual(msg.data, received[
"msg"][
"data"])
191 if __name__ ==
'__main__':
192 rosunit.unitrun(PKG, NAME, TestSubscriberManager)