6 from rosgraph
import Master
16 PKG =
'rosbridge_library' 17 NAME =
'test_subscriber_manager' 26 return topicname
in dict(rospy.get_published_topics()).keys()
29 return topicname
in dict(Master(
"test_subscriber_manager").getSystemState()[1])
32 """ Register a publisher on a clean topic with a good msg type """ 33 topic =
"/test_subscribe" 34 msg_type =
"std_msgs/String" 35 client =
"client_test_subscribe" 37 self.assertFalse(topic
in manager._subscribers)
39 manager.subscribe(client, topic,
None, msg_type)
40 self.assertTrue(topic
in manager._subscribers)
43 manager.unsubscribe(client, topic)
44 self.assertFalse(topic
in manager._subscribers)
48 topic =
"/test_register_subscriber_multiclient" 49 msg_type =
"std_msgs/String" 50 client1 =
"client_test_register_subscriber_multiclient_1" 51 client2 =
"client_test_register_subscriber_multiclient_2" 53 self.assertFalse(topic
in manager._subscribers)
55 manager.subscribe(client1, topic,
None, msg_type)
56 self.assertTrue(topic
in manager._subscribers)
58 manager.subscribe(client2, topic,
None, msg_type)
59 self.assertTrue(topic
in manager._subscribers)
61 manager.unsubscribe(client1, topic)
62 self.assertTrue(topic
in manager._subscribers)
64 manager.unsubscribe(client2, topic)
65 self.assertFalse(topic
in manager._subscribers)
69 topic =
"/test_register_publisher_conflicting_types" 70 msg_type =
"std_msgs/String" 71 msg_type_bad =
"std_msgs/Int32" 72 client =
"client_test_register_publisher_conflicting_types" 74 self.assertFalse(topic
in manager._subscribers)
76 manager.subscribe(client, topic,
None, msg_type)
77 self.assertTrue(topic
in manager._subscribers)
80 self.assertRaises(TypeConflictException, manager.subscribe,
"client2", topic,
None, msg_type_bad)
83 topic1 =
"/test_register_multiple_publishers1" 84 topic2 =
"/test_register_multiple_publishers2" 85 msg_type =
"std_msgs/String" 86 client =
"client_test_register_multiple_publishers" 88 self.assertFalse(topic1
in manager._subscribers)
89 self.assertFalse(topic2
in manager._subscribers)
92 manager.subscribe(client, topic1,
None, msg_type)
93 self.assertTrue(topic1
in manager._subscribers)
95 self.assertFalse(topic2
in manager._subscribers)
97 manager.subscribe(client, topic2,
None, msg_type)
98 self.assertTrue(topic1
in manager._subscribers)
100 self.assertTrue(topic2
in manager._subscribers)
103 manager.unsubscribe(client, topic1)
104 self.assertFalse(topic1
in manager._subscribers)
106 self.assertTrue(topic2
in manager._subscribers)
109 manager.unsubscribe(client, topic2)
110 self.assertFalse(topic1
in manager._subscribers)
112 self.assertFalse(topic2
in manager._subscribers)
116 topic =
"/test_register_no_msgtype" 117 client =
"client_test_register_no_msgtype" 119 self.assertFalse(topic
in manager._subscribers)
121 self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic,
None)
124 topic =
"/test_register_infer_topictype" 125 client =
"client_test_register_infer_topictype" 129 rospy.Subscriber(topic, String,
None)
132 self.assertFalse(topic
in manager._subscribers)
133 manager.subscribe(client, topic,
None)
134 self.assertTrue(topic
in manager._subscribers)
137 manager.unsubscribe(client, topic)
138 self.assertFalse(topic
in manager._subscribers)
142 topic =
"/test_register_multiple_notopictype" 143 msg_type =
"std_msgs/String" 144 client1 =
"client_test_register_multiple_notopictype_1" 145 client2 =
"client_test_register_multiple_notopictype_2" 147 self.assertFalse(topic
in manager._subscribers)
149 manager.subscribe(client1, topic,
None, msg_type)
150 self.assertTrue(topic
in manager._subscribers)
152 manager.subscribe(client2, topic,
None)
153 self.assertTrue(topic
in manager._subscribers)
155 manager.unsubscribe(client1, topic)
156 self.assertTrue(topic
in manager._subscribers)
158 manager.unsubscribe(client2, topic)
159 self.assertFalse(topic
in manager._subscribers)
163 topic =
"/test_subscribe_not_registered" 164 client =
"client_test_subscribe_not_registered" 166 self.assertFalse(topic
in manager._subscribers)
168 self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic,
None)
171 topic =
"/test_publisher_manager_publish" 172 msg_type =
"std_msgs/String" 173 client =
"client_test_publisher_manager_publish" 176 msg.data =
"dsajfadsufasdjf" 178 pub = rospy.Publisher(topic, String)
179 received = {
"msg":
None}
182 received[
"msg"] = msg.get_json_values()
184 manager.subscribe(client, topic, cb, msg_type)
188 self.assertEqual(msg.data, received[
"msg"][
"data"])
191 if __name__ ==
'__main__':
192 rosunit.unitrun(PKG, NAME, TestSubscriberManager)
def test_register_no_msgtype(self)
def test_register_infer_topictype(self)
def test_register_multiple_publishers(self)
def test_register_multiple_notopictype(self)
def test_register_publisher_conflicting_types(self)
def is_topic_subscribed(self, topicname)
def is_topic_published(self, topicname)
def test_register_subscriber_multiclient(self)
def test_publisher_manager_publish(self)
def test_subscribe_not_registered(self)