test_multi_subscriber.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 from rosgraph import Master
7 
8 from time import sleep, time
9 
12 from rosbridge_library.internal import ros_loader
14 from std_msgs.msg import String, Int32
15 
16 
17 PKG = 'rosbridge_library'
18 NAME = 'test_multi_subscriber'
19 
20 
21 class TestMultiSubscriber(unittest.TestCase):
22 
23  def setUp(self):
24  rospy.init_node(NAME)
25 
26  def is_topic_published(self, topicname):
27  return topicname in dict(rospy.get_published_topics()).keys()
28 
29  def is_topic_subscribed(self, topicname):
30  return topicname in dict(Master("test_multi_subscriber").getSystemState()[1])
31 
33  """ Register a subscriber on a clean topic with a good msg type """
34  topic = "/test_register_multisubscriber"
35  msg_type = "std_msgs/String"
36 
37  self.assertFalse(self.is_topic_subscribed(topic))
38  MultiSubscriber(topic, msg_type)
39  self.assertTrue(self.is_topic_subscribed(topic))
40 
42  """ Register and unregister a subscriber on a clean topic with a good msg type """
43  topic = "/test_unregister_multisubscriber"
44  msg_type = "std_msgs/String"
45 
46  self.assertFalse(self.is_topic_subscribed(topic))
47  multi = MultiSubscriber(topic, msg_type)
48  self.assertTrue(self.is_topic_subscribed(topic))
49  multi.unregister()
50  self.assertFalse(self.is_topic_subscribed(topic))
51 
52  def test_verify_type(self):
53  topic = "/test_verify_type"
54  msg_type = "std_msgs/String"
55  othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
56  "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
57  "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
58  "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
59  "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
60  "sensor_msgs/PointCloud2"]
61 
62  s = MultiSubscriber(topic, msg_type)
63  s.verify_type(msg_type)
64  for othertype in othertypes:
65  self.assertRaises(TypeConflictException, s.verify_type, othertype)
66 
68  topic = "/test_subscribe_unsubscribe"
69  msg_type = "std_msgs/String"
70  client = "client_test_subscribe_unsubscribe"
71 
72  self.assertFalse(self.is_topic_subscribed(topic))
73  multi = MultiSubscriber(topic, msg_type)
74  self.assertTrue(self.is_topic_subscribed(topic))
75  self.assertFalse(multi.has_subscribers())
76 
77  multi.subscribe(client, None)
78  self.assertTrue(multi.has_subscribers())
79 
80  multi.unsubscribe(client)
81  self.assertFalse(multi.has_subscribers())
82 
83  multi.unregister()
84  self.assertFalse(self.is_topic_subscribed(topic))
85 
87  topic = "/test_subscribe_receive_json"
88  msg_type = "std_msgs/String"
89  client = "client_test_subscribe_receive_json"
90 
91  msg = String()
92  msg.data = "dsajfadsufasdjf"
93 
94  pub = rospy.Publisher(topic, String)
95  multi = MultiSubscriber(topic, msg_type)
96 
97  received = {"msg": None}
98 
99  def cb(msg):
100  received["msg"] = msg.get_json_values()
101 
102  multi.subscribe(client, cb)
103  sleep(0.5)
104  pub.publish(msg)
105  sleep(0.5)
106  self.assertEqual(msg.data, received["msg"]["data"])
107 
109  topic = "/test_subscribe_receive_json_multiple"
110  msg_type = "std_msgs/Int32"
111  client = "client_test_subscribe_receive_json_multiple"
112 
113  numbers = list(range(100))
114 
115  pub = rospy.Publisher(topic, Int32)
116  multi = MultiSubscriber(topic, msg_type)
117 
118  received = {"msgs": []}
119 
120  def cb(msg):
121  received["msgs"].append(msg.get_json_values()["data"])
122 
123  multi.subscribe(client, cb)
124  sleep(0.5)
125  for x in numbers:
126  msg = Int32()
127  msg.data = x
128  pub.publish(msg)
129  sleep(0.5)
130  self.assertEqual(numbers, received["msgs"])
131 
133  topic = "/test_unsubscribe_does_not_receive_further_msgs"
134  msg_type = "std_msgs/String"
135  client = "client_test_unsubscribe_does_not_receive_further_msgs"
136 
137  msg = String()
138  msg.data = "dsajfadsufasdjf"
139 
140  pub = rospy.Publisher(topic, String)
141  multi = MultiSubscriber(topic, msg_type)
142 
143  received = {"count": 0}
144 
145  def cb(msg):
146  received["count"] = received["count"] + 1
147 
148  multi.subscribe(client, cb)
149  sleep(0.5)
150  pub.publish(msg)
151  sleep(0.5)
152  self.assertEqual(received["count"], 1)
153  multi.unsubscribe(client)
154  sleep(0.5)
155  pub.publish(msg)
156  sleep(0.5)
157  self.assertEqual(received["count"], 1)
158 
160  topic = "/test_subscribe_receive_json"
161  msg_type = "std_msgs/String"
162  client1 = "client_test_subscribe_receive_json_1"
163  client2 = "client_test_subscribe_receive_json_2"
164 
165  msg = String()
166  msg.data = "dsajfadsufasdjf"
167 
168  pub = rospy.Publisher(topic, String)
169  multi = MultiSubscriber(topic, msg_type)
170 
171  received = {"msg1": None, "msg2": None}
172 
173  def cb1(msg):
174  received["msg1"] = msg.get_json_values()
175 
176  def cb2(msg):
177  received["msg2"] = msg.get_json_values()
178 
179  multi.subscribe(client1, cb1)
180  multi.subscribe(client2, cb2)
181  sleep(0.5)
182  pub.publish(msg)
183  sleep(0.5)
184  self.assertEqual(msg.data, received["msg1"]["data"])
185  self.assertEqual(msg.data, received["msg2"]["data"])
186 
187 
188 if __name__ == '__main__':
189  rosunit.unitrun(PKG, NAME, TestMultiSubscriber)
190 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18