6 from rosgraph
import Master
8 from time
import sleep, time
17 PKG =
'rosbridge_library'
18 NAME =
'test_multi_subscriber'
27 return topicname
in dict(rospy.get_published_topics()).keys()
30 return topicname
in dict(Master(
"test_multi_subscriber").getSystemState()[1])
33 """ Register a subscriber on a clean topic with a good msg type """
34 topic =
"/test_register_multisubscriber"
35 msg_type =
"std_msgs/String"
42 """ Register and unregister a subscriber on a clean topic with a good msg type """
43 topic =
"/test_unregister_multisubscriber"
44 msg_type =
"std_msgs/String"
53 topic =
"/test_verify_type"
54 msg_type =
"std_msgs/String"
55 othertypes = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
56 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
57 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
58 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
59 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
60 "sensor_msgs/PointCloud2"]
63 s.verify_type(msg_type)
64 for othertype
in othertypes:
65 self.assertRaises(TypeConflictException, s.verify_type, othertype)
68 topic =
"/test_subscribe_unsubscribe"
69 msg_type =
"std_msgs/String"
70 client =
"client_test_subscribe_unsubscribe"
75 self.assertFalse(multi.has_subscribers())
77 multi.subscribe(client,
None)
78 self.assertTrue(multi.has_subscribers())
80 multi.unsubscribe(client)
81 self.assertFalse(multi.has_subscribers())
87 topic =
"/test_subscribe_receive_json"
88 msg_type =
"std_msgs/String"
89 client =
"client_test_subscribe_receive_json"
92 msg.data =
"dsajfadsufasdjf"
94 pub = rospy.Publisher(topic, String)
97 received = {
"msg":
None}
100 received[
"msg"] = msg.get_json_values()
102 multi.subscribe(client, cb)
106 self.assertEqual(msg.data, received[
"msg"][
"data"])
109 topic =
"/test_subscribe_receive_json_multiple"
110 msg_type =
"std_msgs/Int32"
111 client =
"client_test_subscribe_receive_json_multiple"
113 numbers = list(range(100))
115 pub = rospy.Publisher(topic, Int32)
118 received = {
"msgs": []}
121 received[
"msgs"].append(msg.get_json_values()[
"data"])
123 multi.subscribe(client, cb)
130 self.assertEqual(numbers, received[
"msgs"])
133 topic =
"/test_unsubscribe_does_not_receive_further_msgs"
134 msg_type =
"std_msgs/String"
135 client =
"client_test_unsubscribe_does_not_receive_further_msgs"
138 msg.data =
"dsajfadsufasdjf"
140 pub = rospy.Publisher(topic, String)
143 received = {
"count": 0}
146 received[
"count"] = received[
"count"] + 1
148 multi.subscribe(client, cb)
152 self.assertEqual(received[
"count"], 1)
153 multi.unsubscribe(client)
157 self.assertEqual(received[
"count"], 1)
160 topic =
"/test_subscribe_receive_json"
161 msg_type =
"std_msgs/String"
162 client1 =
"client_test_subscribe_receive_json_1"
163 client2 =
"client_test_subscribe_receive_json_2"
166 msg.data =
"dsajfadsufasdjf"
168 pub = rospy.Publisher(topic, String)
171 received = {
"msg1":
None,
"msg2":
None}
174 received[
"msg1"] = msg.get_json_values()
177 received[
"msg2"] = msg.get_json_values()
179 multi.subscribe(client1, cb1)
180 multi.subscribe(client2, cb2)
184 self.assertEqual(msg.data, received[
"msg1"][
"data"])
185 self.assertEqual(msg.data, received[
"msg2"][
"data"])
188 if __name__ ==
'__main__':
189 rosunit.unitrun(PKG, NAME, TestMultiSubscriber)