test_multi_subscriber.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 from rosgraph import Master
7 
8 from time import sleep, time
9 
12 from rosbridge_library.internal import ros_loader
14 from std_msgs.msg import String, Int32
15 
16 
17 class TestMultiSubscriber(unittest.TestCase):
18 
19  def setUp(self):
20  rospy.init_node("test_multi_subscriber")
21 
22  def is_topic_published(self, topicname):
23  return topicname in dict(rospy.get_published_topics()).keys()
24 
25  def is_topic_subscribed(self, topicname):
26  return topicname in dict(Master("test_multi_subscriber").getSystemState()[1])
27 
29  """ Register a subscriber on a clean topic with a good msg type """
30  topic = "/test_register_multisubscriber"
31  msg_type = "std_msgs/String"
32 
33  self.assertFalse(self.is_topic_subscribed(topic))
34  MultiSubscriber(topic, msg_type)
35  self.assertTrue(self.is_topic_subscribed(topic))
36 
38  """ Register and unregister a subscriber on a clean topic with a good msg type """
39  topic = "/test_unregister_multisubscriber"
40  msg_type = "std_msgs/String"
41 
42  self.assertFalse(self.is_topic_subscribed(topic))
43  multi = MultiSubscriber(topic, msg_type)
44  self.assertTrue(self.is_topic_subscribed(topic))
45  multi.unregister()
46  self.assertFalse(self.is_topic_subscribed(topic))
47 
48  def test_verify_type(self):
49  topic = "/test_verify_type"
50  msg_type = "std_msgs/String"
51  othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
52  "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
53  "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
54  "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
55  "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
56  "sensor_msgs/PointCloud2"]
57 
58  s = MultiSubscriber(topic, msg_type)
59  s.verify_type(msg_type)
60  for othertype in othertypes:
61  self.assertRaises(TypeConflictException, s.verify_type, othertype)
62 
64  topic = "/test_subscribe_unsubscribe"
65  msg_type = "std_msgs/String"
66  client = "client_test_subscribe_unsubscribe"
67 
68  self.assertFalse(self.is_topic_subscribed(topic))
69  multi = MultiSubscriber(topic, msg_type)
70  self.assertTrue(self.is_topic_subscribed(topic))
71  self.assertFalse(multi.has_subscribers())
72 
73  multi.subscribe(client, None)
74  self.assertTrue(multi.has_subscribers())
75 
76  multi.unsubscribe(client)
77  self.assertFalse(multi.has_subscribers())
78 
79  multi.unregister()
80  self.assertFalse(self.is_topic_subscribed(topic))
81 
83  topic = "/test_subscribe_receive_json"
84  msg_type = "std_msgs/String"
85  client = "client_test_subscribe_receive_json"
86 
87  msg = String()
88  msg.data = "dsajfadsufasdjf"
89 
90  pub = rospy.Publisher(topic, String)
91  multi = MultiSubscriber(topic, msg_type)
92 
93  received = {"msg": None}
94 
95  def cb(msg):
96  received["msg"] = msg.get_json_values()
97 
98  multi.subscribe(client, cb)
99  sleep(0.5)
100  pub.publish(msg)
101  sleep(0.5)
102  self.assertEqual(msg.data, received["msg"]["data"])
103 
105  topic = "/test_subscribe_receive_json_multiple"
106  msg_type = "std_msgs/Int32"
107  client = "client_test_subscribe_receive_json_multiple"
108 
109  numbers = list(range(100))
110 
111  pub = rospy.Publisher(topic, Int32)
112  multi = MultiSubscriber(topic, msg_type)
113 
114  received = {"msgs": []}
115 
116  def cb(msg):
117  received["msgs"].append(msg.get_json_values()["data"])
118 
119  multi.subscribe(client, cb)
120  sleep(0.5)
121  for x in numbers:
122  msg = Int32()
123  msg.data = x
124  pub.publish(msg)
125  sleep(0.5)
126  self.assertEqual(numbers, received["msgs"])
127 
129  topic = "/test_unsubscribe_does_not_receive_further_msgs"
130  msg_type = "std_msgs/String"
131  client = "client_test_unsubscribe_does_not_receive_further_msgs"
132 
133  msg = String()
134  msg.data = "dsajfadsufasdjf"
135 
136  pub = rospy.Publisher(topic, String)
137  multi = MultiSubscriber(topic, msg_type)
138 
139  received = {"count": 0}
140 
141  def cb(msg):
142  received["count"] = received["count"] + 1
143 
144  multi.subscribe(client, cb)
145  sleep(0.5)
146  pub.publish(msg)
147  sleep(0.5)
148  self.assertEqual(received["count"], 1)
149  multi.unsubscribe(client)
150  sleep(0.5)
151  pub.publish(msg)
152  sleep(0.5)
153  self.assertEqual(received["count"], 1)
154 
156  topic = "/test_subscribe_receive_json"
157  msg_type = "std_msgs/String"
158  client1 = "client_test_subscribe_receive_json_1"
159  client2 = "client_test_subscribe_receive_json_2"
160 
161  msg = String()
162  msg.data = "dsajfadsufasdjf"
163 
164  pub = rospy.Publisher(topic, String)
165  multi = MultiSubscriber(topic, msg_type)
166 
167  received = {"msg1": None, "msg2": None}
168 
169  def cb1(msg):
170  received["msg1"] = msg.get_json_values()
171 
172  def cb2(msg):
173  received["msg2"] = msg.get_json_values()
174 
175  multi.subscribe(client1, cb1)
176  multi.subscribe(client2, cb2)
177  sleep(0.5)
178  pub.publish(msg)
179  sleep(0.5)
180  self.assertEqual(msg.data, received["msg1"]["data"])
181  self.assertEqual(msg.data, received["msg2"]["data"])
182 
183 
184 PKG = 'rosbridge_library'
185 NAME = 'test_multi_subscriber'
186 if __name__ == '__main__':
187  rostest.unitrun(PKG, NAME, TestMultiSubscriber)
188 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14