test_subscriber_manager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 from rosgraph import Master
7 
8 from time import sleep
9 
12 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
13 from std_msgs.msg import String, Int32
14 
15 
16 PKG = 'rosbridge_library'
17 NAME = 'test_subscriber_manager'
18 
19 
20 class TestSubscriberManager(unittest.TestCase):
21 
22  def setUp(self):
23  rospy.init_node(NAME)
24 
25  def is_topic_published(self, topicname):
26  return topicname in dict(rospy.get_published_topics()).keys()
27 
28  def is_topic_subscribed(self, topicname):
29  return topicname in dict(Master("test_subscriber_manager").getSystemState()[1])
30 
31  def test_subscribe(self):
32  """ Register a publisher on a clean topic with a good msg type """
33  topic = "/test_subscribe"
34  msg_type = "std_msgs/String"
35  client = "client_test_subscribe"
36 
37  self.assertFalse(topic in manager._subscribers)
38  self.assertFalse(self.is_topic_subscribed(topic))
39  manager.subscribe(client, topic, None, msg_type)
40  self.assertTrue(topic in manager._subscribers)
41  self.assertTrue(self.is_topic_subscribed(topic))
42 
43  manager.unsubscribe(client, topic)
44  self.assertFalse(topic in manager._subscribers)
45  self.assertFalse(self.is_topic_subscribed(topic))
46 
48  topic = "/test_register_subscriber_multiclient"
49  msg_type = "std_msgs/String"
50  client1 = "client_test_register_subscriber_multiclient_1"
51  client2 = "client_test_register_subscriber_multiclient_2"
52 
53  self.assertFalse(topic in manager._subscribers)
54  self.assertFalse(self.is_topic_subscribed(topic))
55  manager.subscribe(client1, topic, None, msg_type)
56  self.assertTrue(topic in manager._subscribers)
57  self.assertTrue(self.is_topic_subscribed(topic))
58  manager.subscribe(client2, topic, None, msg_type)
59  self.assertTrue(topic in manager._subscribers)
60  self.assertTrue(self.is_topic_subscribed(topic))
61  manager.unsubscribe(client1, topic)
62  self.assertTrue(topic in manager._subscribers)
63  self.assertTrue(self.is_topic_subscribed(topic))
64  manager.unsubscribe(client2, topic)
65  self.assertFalse(topic in manager._subscribers)
66  self.assertFalse(self.is_topic_subscribed(topic))
67 
69  topic = "/test_register_publisher_conflicting_types"
70  msg_type = "std_msgs/String"
71  msg_type_bad = "std_msgs/Int32"
72  client = "client_test_register_publisher_conflicting_types"
73 
74  self.assertFalse(topic in manager._subscribers)
75  self.assertFalse(self.is_topic_subscribed(topic))
76  manager.subscribe(client, topic, None, msg_type)
77  self.assertTrue(topic in manager._subscribers)
78  self.assertTrue(self.is_topic_subscribed(topic))
79 
80  self.assertRaises(TypeConflictException, manager.subscribe, "client2", topic, None, msg_type_bad)
81 
83  topic1 = "/test_register_multiple_publishers1"
84  topic2 = "/test_register_multiple_publishers2"
85  msg_type = "std_msgs/String"
86  client = "client_test_register_multiple_publishers"
87 
88  self.assertFalse(topic1 in manager._subscribers)
89  self.assertFalse(topic2 in manager._subscribers)
90  self.assertFalse(self.is_topic_subscribed(topic1))
91  self.assertFalse(self.is_topic_subscribed(topic2))
92  manager.subscribe(client, topic1, None, msg_type)
93  self.assertTrue(topic1 in manager._subscribers)
94  self.assertTrue(self.is_topic_subscribed(topic1))
95  self.assertFalse(topic2 in manager._subscribers)
96  self.assertFalse(self.is_topic_subscribed(topic2))
97  manager.subscribe(client, topic2, None, msg_type)
98  self.assertTrue(topic1 in manager._subscribers)
99  self.assertTrue(self.is_topic_subscribed(topic1))
100  self.assertTrue(topic2 in manager._subscribers)
101  self.assertTrue(self.is_topic_subscribed(topic2))
102 
103  manager.unsubscribe(client, topic1)
104  self.assertFalse(topic1 in manager._subscribers)
105  self.assertFalse(self.is_topic_subscribed(topic1))
106  self.assertTrue(topic2 in manager._subscribers)
107  self.assertTrue(self.is_topic_subscribed(topic2))
108 
109  manager.unsubscribe(client, topic2)
110  self.assertFalse(topic1 in manager._subscribers)
111  self.assertFalse(self.is_topic_subscribed(topic1))
112  self.assertFalse(topic2 in manager._subscribers)
113  self.assertFalse(self.is_topic_subscribed(topic2))
114 
116  topic = "/test_register_no_msgtype"
117  client = "client_test_register_no_msgtype"
118 
119  self.assertFalse(topic in manager._subscribers)
120  self.assertFalse(self.is_topic_subscribed(topic))
121  self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None)
122 
124  topic = "/test_register_infer_topictype"
125  client = "client_test_register_infer_topictype"
126 
127  self.assertFalse(self.is_topic_subscribed(topic))
128 
129  rospy.Subscriber(topic, String, None)
130 
131  self.assertTrue(self.is_topic_subscribed(topic))
132  self.assertFalse(topic in manager._subscribers)
133  manager.subscribe(client, topic, None)
134  self.assertTrue(topic in manager._subscribers)
135  self.assertTrue(self.is_topic_subscribed(topic))
136 
137  manager.unsubscribe(client, topic)
138  self.assertFalse(topic in manager._subscribers)
139  self.assertTrue(self.is_topic_subscribed(topic))
140 
142  topic = "/test_register_multiple_notopictype"
143  msg_type = "std_msgs/String"
144  client1 = "client_test_register_multiple_notopictype_1"
145  client2 = "client_test_register_multiple_notopictype_2"
146 
147  self.assertFalse(topic in manager._subscribers)
148  self.assertFalse(self.is_topic_subscribed(topic))
149  manager.subscribe(client1, topic, None, msg_type)
150  self.assertTrue(topic in manager._subscribers)
151  self.assertTrue(self.is_topic_subscribed(topic))
152  manager.subscribe(client2, topic, None)
153  self.assertTrue(topic in manager._subscribers)
154  self.assertTrue(self.is_topic_subscribed(topic))
155  manager.unsubscribe(client1, topic)
156  self.assertTrue(topic in manager._subscribers)
157  self.assertTrue(self.is_topic_subscribed(topic))
158  manager.unsubscribe(client2, topic)
159  self.assertFalse(topic in manager._subscribers)
160  self.assertFalse(self.is_topic_subscribed(topic))
161 
163  topic = "/test_subscribe_not_registered"
164  client = "client_test_subscribe_not_registered"
165 
166  self.assertFalse(topic in manager._subscribers)
167  self.assertFalse(self.is_topic_subscribed(topic))
168  self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None)
169 
171  topic = "/test_publisher_manager_publish"
172  msg_type = "std_msgs/String"
173  client = "client_test_publisher_manager_publish"
174 
175  msg = String()
176  msg.data = "dsajfadsufasdjf"
177 
178  pub = rospy.Publisher(topic, String)
179  received = {"msg": None}
180 
181  def cb(msg):
182  received["msg"] = msg.get_json_values()
183 
184  manager.subscribe(client, topic, cb, msg_type)
185  sleep(0.5)
186  pub.publish(msg)
187  sleep(0.5)
188  self.assertEqual(msg.data, received["msg"]["data"])
189 
190 
191 if __name__ == '__main__':
192  rosunit.unitrun(PKG, NAME, TestSubscriberManager)
193 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18