6 from rosgraph
import Master
8 from time
import sleep, time
17 PKG =
'rosbridge_library' 18 NAME =
'test_multi_subscriber' 27 return topicname
in dict(rospy.get_published_topics()).keys()
30 return topicname
in dict(Master(
"test_multi_subscriber").getSystemState()[1])
33 """ Register a subscriber on a clean topic with a good msg type """ 34 topic =
"/test_register_multisubscriber" 35 msg_type =
"std_msgs/String" 42 """ Register and unregister a subscriber on a clean topic with a good msg type """ 43 topic =
"/test_unregister_multisubscriber" 44 msg_type =
"std_msgs/String" 53 topic =
"/test_verify_type" 54 msg_type =
"std_msgs/String" 55 othertypes = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
56 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
57 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
58 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
59 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
60 "sensor_msgs/PointCloud2"]
63 s.verify_type(msg_type)
64 for othertype
in othertypes:
65 self.assertRaises(TypeConflictException, s.verify_type, othertype)
68 topic =
"/test_subscribe_unsubscribe" 69 msg_type =
"std_msgs/String" 70 client =
"client_test_subscribe_unsubscribe" 75 self.assertFalse(multi.has_subscribers())
77 multi.subscribe(client,
None)
78 self.assertTrue(multi.has_subscribers())
80 multi.unsubscribe(client)
81 self.assertFalse(multi.has_subscribers())
87 topic =
"/test_subscribe_receive_json" 88 msg_type =
"std_msgs/String" 89 client =
"client_test_subscribe_receive_json" 92 msg.data =
"dsajfadsufasdjf" 94 pub = rospy.Publisher(topic, String)
97 received = {
"msg":
None}
100 received[
"msg"] = msg.get_json_values()
102 multi.subscribe(client, cb)
106 self.assertEqual(msg.data, received[
"msg"][
"data"])
109 topic =
"/test_subscribe_receive_json_multiple" 110 msg_type =
"std_msgs/Int32" 111 client =
"client_test_subscribe_receive_json_multiple" 113 numbers = list(range(100))
115 pub = rospy.Publisher(topic, Int32)
118 received = {
"msgs": []}
121 received[
"msgs"].append(msg.get_json_values()[
"data"])
123 multi.subscribe(client, cb)
130 self.assertEqual(numbers, received[
"msgs"])
133 topic =
"/test_unsubscribe_does_not_receive_further_msgs" 134 msg_type =
"std_msgs/String" 135 client =
"client_test_unsubscribe_does_not_receive_further_msgs" 138 msg.data =
"dsajfadsufasdjf" 140 pub = rospy.Publisher(topic, String)
143 received = {
"count": 0}
146 received[
"count"] = received[
"count"] + 1
148 multi.subscribe(client, cb)
152 self.assertEqual(received[
"count"], 1)
153 multi.unsubscribe(client)
157 self.assertEqual(received[
"count"], 1)
160 topic =
"/test_subscribe_receive_json" 161 msg_type =
"std_msgs/String" 162 client1 =
"client_test_subscribe_receive_json_1" 163 client2 =
"client_test_subscribe_receive_json_2" 166 msg.data =
"dsajfadsufasdjf" 168 pub = rospy.Publisher(topic, String)
171 received = {
"msg1":
None,
"msg2":
None}
174 received[
"msg1"] = msg.get_json_values()
177 received[
"msg2"] = msg.get_json_values()
179 multi.subscribe(client1, cb1)
180 multi.subscribe(client2, cb2)
184 self.assertEqual(msg.data, received[
"msg1"][
"data"])
185 self.assertEqual(msg.data, received[
"msg2"][
"data"])
188 if __name__ ==
'__main__':
189 rosunit.unitrun(PKG, NAME, TestMultiSubscriber)
def test_multiple_subscribers(self)
def test_subscribe_receive_json_multiple(self)
def test_subscribe_unsubscribe(self)
def test_unregister_multisubscriber(self)
def test_register_multisubscriber(self)
def is_topic_subscribed(self, topicname)
def is_topic_published(self, topicname)
def test_unsubscribe_does_not_receive_further_msgs(self)
def test_verify_type(self)
def test_subscribe_receive_json(self)