test_subscriber_manager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 from rosgraph import Master
7 
8 from time import sleep
9 
12 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
13 from std_msgs.msg import String, Int32
14 
15 
16 class TestSubscriberManager(unittest.TestCase):
17 
18  def setUp(self):
19  rospy.init_node("test_subscriber_manager")
20 
21  def is_topic_published(self, topicname):
22  return topicname in dict(rospy.get_published_topics()).keys()
23 
24  def is_topic_subscribed(self, topicname):
25  return topicname in dict(Master("test_subscriber_manager").getSystemState()[1])
26 
27  def test_subscribe(self):
28  """ Register a publisher on a clean topic with a good msg type """
29  topic = "/test_subscribe"
30  msg_type = "std_msgs/String"
31  client = "client_test_subscribe"
32 
33  self.assertFalse(topic in manager._subscribers)
34  self.assertFalse(self.is_topic_subscribed(topic))
35  manager.subscribe(client, topic, None, msg_type)
36  self.assertTrue(topic in manager._subscribers)
37  self.assertTrue(self.is_topic_subscribed(topic))
38 
39  manager.unsubscribe(client, topic)
40  self.assertFalse(topic in manager._subscribers)
41  self.assertFalse(self.is_topic_subscribed(topic))
42 
44  topic = "/test_register_subscriber_multiclient"
45  msg_type = "std_msgs/String"
46  client1 = "client_test_register_subscriber_multiclient_1"
47  client2 = "client_test_register_subscriber_multiclient_2"
48 
49  self.assertFalse(topic in manager._subscribers)
50  self.assertFalse(self.is_topic_subscribed(topic))
51  manager.subscribe(client1, topic, None, msg_type)
52  self.assertTrue(topic in manager._subscribers)
53  self.assertTrue(self.is_topic_subscribed(topic))
54  manager.subscribe(client2, topic, None, msg_type)
55  self.assertTrue(topic in manager._subscribers)
56  self.assertTrue(self.is_topic_subscribed(topic))
57  manager.unsubscribe(client1, topic)
58  self.assertTrue(topic in manager._subscribers)
59  self.assertTrue(self.is_topic_subscribed(topic))
60  manager.unsubscribe(client2, topic)
61  self.assertFalse(topic in manager._subscribers)
62  self.assertFalse(self.is_topic_subscribed(topic))
63 
65  topic = "/test_register_publisher_conflicting_types"
66  msg_type = "std_msgs/String"
67  msg_type_bad = "std_msgs/Int32"
68  client = "client_test_register_publisher_conflicting_types"
69 
70  self.assertFalse(topic in manager._subscribers)
71  self.assertFalse(self.is_topic_subscribed(topic))
72  manager.subscribe(client, topic, None, msg_type)
73  self.assertTrue(topic in manager._subscribers)
74  self.assertTrue(self.is_topic_subscribed(topic))
75 
76  self.assertRaises(TypeConflictException, manager.subscribe, "client2", topic, None, msg_type_bad)
77 
79  topic1 = "/test_register_multiple_publishers1"
80  topic2 = "/test_register_multiple_publishers2"
81  msg_type = "std_msgs/String"
82  client = "client_test_register_multiple_publishers"
83 
84  self.assertFalse(topic1 in manager._subscribers)
85  self.assertFalse(topic2 in manager._subscribers)
86  self.assertFalse(self.is_topic_subscribed(topic1))
87  self.assertFalse(self.is_topic_subscribed(topic2))
88  manager.subscribe(client, topic1, None, msg_type)
89  self.assertTrue(topic1 in manager._subscribers)
90  self.assertTrue(self.is_topic_subscribed(topic1))
91  self.assertFalse(topic2 in manager._subscribers)
92  self.assertFalse(self.is_topic_subscribed(topic2))
93  manager.subscribe(client, topic2, None, msg_type)
94  self.assertTrue(topic1 in manager._subscribers)
95  self.assertTrue(self.is_topic_subscribed(topic1))
96  self.assertTrue(topic2 in manager._subscribers)
97  self.assertTrue(self.is_topic_subscribed(topic2))
98 
99  manager.unsubscribe(client, topic1)
100  self.assertFalse(topic1 in manager._subscribers)
101  self.assertFalse(self.is_topic_subscribed(topic1))
102  self.assertTrue(topic2 in manager._subscribers)
103  self.assertTrue(self.is_topic_subscribed(topic2))
104 
105  manager.unsubscribe(client, topic2)
106  self.assertFalse(topic1 in manager._subscribers)
107  self.assertFalse(self.is_topic_subscribed(topic1))
108  self.assertFalse(topic2 in manager._subscribers)
109  self.assertFalse(self.is_topic_subscribed(topic2))
110 
112  topic = "/test_register_no_msgtype"
113  client = "client_test_register_no_msgtype"
114 
115  self.assertFalse(topic in manager._subscribers)
116  self.assertFalse(self.is_topic_subscribed(topic))
117  self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None)
118 
120  topic = "/test_register_infer_topictype"
121  client = "client_test_register_infer_topictype"
122 
123  self.assertFalse(self.is_topic_subscribed(topic))
124 
125  rospy.Subscriber(topic, String, None)
126 
127  self.assertTrue(self.is_topic_subscribed(topic))
128  self.assertFalse(topic in manager._subscribers)
129  manager.subscribe(client, topic, None)
130  self.assertTrue(topic in manager._subscribers)
131  self.assertTrue(self.is_topic_subscribed(topic))
132 
133  manager.unsubscribe(client, topic)
134  self.assertFalse(topic in manager._subscribers)
135  self.assertTrue(self.is_topic_subscribed(topic))
136 
138  topic = "/test_register_multiple_notopictype"
139  msg_type = "std_msgs/String"
140  client1 = "client_test_register_multiple_notopictype_1"
141  client2 = "client_test_register_multiple_notopictype_2"
142 
143  self.assertFalse(topic in manager._subscribers)
144  self.assertFalse(self.is_topic_subscribed(topic))
145  manager.subscribe(client1, topic, None, msg_type)
146  self.assertTrue(topic in manager._subscribers)
147  self.assertTrue(self.is_topic_subscribed(topic))
148  manager.subscribe(client2, topic, None)
149  self.assertTrue(topic in manager._subscribers)
150  self.assertTrue(self.is_topic_subscribed(topic))
151  manager.unsubscribe(client1, topic)
152  self.assertTrue(topic in manager._subscribers)
153  self.assertTrue(self.is_topic_subscribed(topic))
154  manager.unsubscribe(client2, topic)
155  self.assertFalse(topic in manager._subscribers)
156  self.assertFalse(self.is_topic_subscribed(topic))
157 
159  topic = "/test_subscribe_not_registered"
160  client = "client_test_subscribe_not_registered"
161 
162  self.assertFalse(topic in manager._subscribers)
163  self.assertFalse(self.is_topic_subscribed(topic))
164  self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None)
165 
167  topic = "/test_publisher_manager_publish"
168  msg_type = "std_msgs/String"
169  client = "client_test_publisher_manager_publish"
170 
171  msg = String()
172  msg.data = "dsajfadsufasdjf"
173 
174  pub = rospy.Publisher(topic, String)
175  received = {"msg": None}
176 
177  def cb(msg):
178  received["msg"] = msg.get_json_values()
179 
180  manager.subscribe(client, topic, cb, msg_type)
181  sleep(0.5)
182  pub.publish(msg)
183  sleep(0.5)
184  self.assertEqual(msg.data, received["msg"]["data"])
185 
186 
187 PKG = 'rosbridge_library'
188 NAME = 'test_subscriber_manager'
189 if __name__ == '__main__':
190  rostest.unitrun(PKG, NAME, TestSubscriberManager)
191 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14