6 from rosgraph
import Master
8 from time
import sleep, time
20 rospy.init_node(
"test_multi_subscriber")
23 return topicname
in dict(rospy.get_published_topics()).keys()
26 return topicname
in dict(Master(
"test_multi_subscriber").getSystemState()[1])
29 """ Register a subscriber on a clean topic with a good msg type """ 30 topic =
"/test_register_multisubscriber" 31 msg_type =
"std_msgs/String" 38 """ Register and unregister a subscriber on a clean topic with a good msg type """ 39 topic =
"/test_unregister_multisubscriber" 40 msg_type =
"std_msgs/String" 49 topic =
"/test_verify_type" 50 msg_type =
"std_msgs/String" 51 othertypes = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
52 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
53 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
54 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
55 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
56 "sensor_msgs/PointCloud2"]
59 s.verify_type(msg_type)
60 for othertype
in othertypes:
61 self.assertRaises(TypeConflictException, s.verify_type, othertype)
64 topic =
"/test_subscribe_unsubscribe" 65 msg_type =
"std_msgs/String" 66 client =
"client_test_subscribe_unsubscribe" 71 self.assertFalse(multi.has_subscribers())
73 multi.subscribe(client,
None)
74 self.assertTrue(multi.has_subscribers())
76 multi.unsubscribe(client)
77 self.assertFalse(multi.has_subscribers())
83 topic =
"/test_subscribe_receive_json" 84 msg_type =
"std_msgs/String" 85 client =
"client_test_subscribe_receive_json" 88 msg.data =
"dsajfadsufasdjf" 90 pub = rospy.Publisher(topic, String)
93 received = {
"msg":
None}
96 received[
"msg"] = msg.get_json_values()
98 multi.subscribe(client, cb)
102 self.assertEqual(msg.data, received[
"msg"][
"data"])
105 topic =
"/test_subscribe_receive_json_multiple" 106 msg_type =
"std_msgs/Int32" 107 client =
"client_test_subscribe_receive_json_multiple" 109 numbers = list(range(100))
111 pub = rospy.Publisher(topic, Int32)
114 received = {
"msgs": []}
117 received[
"msgs"].append(msg.get_json_values()[
"data"])
119 multi.subscribe(client, cb)
126 self.assertEqual(numbers, received[
"msgs"])
129 topic =
"/test_unsubscribe_does_not_receive_further_msgs" 130 msg_type =
"std_msgs/String" 131 client =
"client_test_unsubscribe_does_not_receive_further_msgs" 134 msg.data =
"dsajfadsufasdjf" 136 pub = rospy.Publisher(topic, String)
139 received = {
"count": 0}
142 received[
"count"] = received[
"count"] + 1
144 multi.subscribe(client, cb)
148 self.assertEqual(received[
"count"], 1)
149 multi.unsubscribe(client)
153 self.assertEqual(received[
"count"], 1)
156 topic =
"/test_subscribe_receive_json" 157 msg_type =
"std_msgs/String" 158 client1 =
"client_test_subscribe_receive_json_1" 159 client2 =
"client_test_subscribe_receive_json_2" 162 msg.data =
"dsajfadsufasdjf" 164 pub = rospy.Publisher(topic, String)
167 received = {
"msg1":
None,
"msg2":
None}
170 received[
"msg1"] = msg.get_json_values()
173 received[
"msg2"] = msg.get_json_values()
175 multi.subscribe(client1, cb1)
176 multi.subscribe(client2, cb2)
180 self.assertEqual(msg.data, received[
"msg1"][
"data"])
181 self.assertEqual(msg.data, received[
"msg2"][
"data"])
184 PKG =
'rosbridge_library' 185 NAME =
'test_multi_subscriber' 186 if __name__ ==
'__main__':
187 rostest.unitrun(PKG, NAME, TestMultiSubscriber)
def test_multiple_subscribers(self)
def test_subscribe_receive_json_multiple(self)
def test_subscribe_unsubscribe(self)
def test_unregister_multisubscriber(self)
def test_register_multisubscriber(self)
def is_topic_subscribed(self, topicname)
def is_topic_published(self, topicname)
def test_unsubscribe_does_not_receive_further_msgs(self)
def test_verify_type(self)
def test_subscribe_receive_json(self)