Go to the documentation of this file.00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 from rosgraph import Master
00007
00008 from time import sleep, time
00009
00010 from rosbridge_library.internal.subscribers import *
00011 from rosbridge_library.internal.topics import *
00012 from rosbridge_library.internal import ros_loader
00013 from rosbridge_library.internal.message_conversion import *
00014 from std_msgs.msg import String, Int32
00015
00016
00017 class TestMultiSubscriber(unittest.TestCase):
00018
00019 def setUp(self):
00020 rospy.init_node("test_multi_subscriber")
00021
00022 def is_topic_published(self, topicname):
00023 return topicname in dict(rospy.get_published_topics()).keys()
00024
00025 def is_topic_subscribed(self, topicname):
00026 return topicname in dict(Master("test_multi_subscriber").getSystemState()[1])
00027
00028 def test_register_multisubscriber(self):
00029 """ Register a subscriber on a clean topic with a good msg type """
00030 topic = "/test_register_multisubscriber"
00031 msg_type = "std_msgs/String"
00032
00033 self.assertFalse(self.is_topic_subscribed(topic))
00034 MultiSubscriber(topic, msg_type)
00035 self.assertTrue(self.is_topic_subscribed(topic))
00036
00037 def test_unregister_multisubscriber(self):
00038 """ Register and unregister a subscriber on a clean topic with a good msg type """
00039 topic = "/test_unregister_multisubscriber"
00040 msg_type = "std_msgs/String"
00041
00042 self.assertFalse(self.is_topic_subscribed(topic))
00043 multi = MultiSubscriber(topic, msg_type)
00044 self.assertTrue(self.is_topic_subscribed(topic))
00045 multi.unregister()
00046 self.assertFalse(self.is_topic_subscribed(topic))
00047
00048 def test_verify_type(self):
00049 topic = "/test_verify_type"
00050 msg_type = "std_msgs/String"
00051 othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00052 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00053 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
00054 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00055 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00056 "sensor_msgs/PointCloud2"]
00057
00058 s = MultiSubscriber(topic, msg_type)
00059 s.verify_type(msg_type)
00060 for othertype in othertypes:
00061 self.assertRaises(TypeConflictException, s.verify_type, othertype)
00062
00063 def test_subscribe_unsubscribe(self):
00064 topic = "/test_subscribe_unsubscribe"
00065 msg_type = "std_msgs/String"
00066 client = "client_test_subscribe_unsubscribe"
00067
00068 self.assertFalse(self.is_topic_subscribed(topic))
00069 multi = MultiSubscriber(topic, msg_type)
00070 self.assertTrue(self.is_topic_subscribed(topic))
00071 self.assertFalse(multi.has_subscribers())
00072
00073 multi.subscribe(client, None)
00074 self.assertTrue(multi.has_subscribers())
00075
00076 multi.unsubscribe(client)
00077 self.assertFalse(multi.has_subscribers())
00078
00079 multi.unregister()
00080 self.assertFalse(self.is_topic_subscribed(topic))
00081
00082 def test_subscribe_receive_json(self):
00083 topic = "/test_subscribe_receive_json"
00084 msg_type = "std_msgs/String"
00085 client = "client_test_subscribe_receive_json"
00086
00087 msg = String()
00088 msg.data = "dsajfadsufasdjf"
00089
00090 pub = rospy.Publisher(topic, String)
00091 multi = MultiSubscriber(topic, msg_type)
00092
00093 received = {"msg": None}
00094
00095 def cb(msg):
00096 received["msg"] = msg
00097
00098 multi.subscribe(client, cb)
00099 sleep(0.5)
00100 pub.publish(msg)
00101 sleep(0.5)
00102 self.assertEqual(msg.data, received["msg"]["data"])
00103
00104 def test_subscribe_receive_json_multiple(self):
00105 topic = "/test_subscribe_receive_json_multiple"
00106 msg_type = "std_msgs/Int32"
00107 client = "client_test_subscribe_receive_json_multiple"
00108
00109 numbers = range(100)
00110
00111 pub = rospy.Publisher(topic, Int32)
00112 multi = MultiSubscriber(topic, msg_type)
00113
00114 received = {"msgs": []}
00115
00116 def cb(msg):
00117 received["msgs"].append(msg["data"])
00118
00119 multi.subscribe(client, cb)
00120 sleep(0.5)
00121 for x in numbers:
00122 msg = Int32()
00123 msg.data = x
00124 pub.publish(msg)
00125 sleep(0.5)
00126 self.assertEqual(numbers, received["msgs"])
00127
00128 def test_unsubscribe_does_not_receive_further_msgs(self):
00129 topic = "/test_unsubscribe_does_not_receive_further_msgs"
00130 msg_type = "std_msgs/String"
00131 client = "client_test_unsubscribe_does_not_receive_further_msgs"
00132
00133 msg = String()
00134 msg.data = "dsajfadsufasdjf"
00135
00136 pub = rospy.Publisher(topic, String)
00137 multi = MultiSubscriber(topic, msg_type)
00138
00139 received = {"count": 0}
00140
00141 def cb(msg):
00142 received["count"] = received["count"] + 1
00143
00144 multi.subscribe(client, cb)
00145 sleep(0.5)
00146 pub.publish(msg)
00147 sleep(0.5)
00148 self.assertEqual(received["count"], 1)
00149 multi.unsubscribe(client)
00150 sleep(0.5)
00151 pub.publish(msg)
00152 sleep(0.5)
00153 self.assertEqual(received["count"], 1)
00154
00155 def test_multiple_subscribers(self):
00156 topic = "/test_subscribe_receive_json"
00157 msg_type = "std_msgs/String"
00158 client1 = "client_test_subscribe_receive_json_1"
00159 client2 = "client_test_subscribe_receive_json_2"
00160
00161 msg = String()
00162 msg.data = "dsajfadsufasdjf"
00163
00164 pub = rospy.Publisher(topic, String)
00165 multi = MultiSubscriber(topic, msg_type)
00166
00167 received = {"msg1": None, "msg2": None}
00168
00169 def cb1(msg):
00170 received["msg1"] = msg
00171
00172 def cb2(msg):
00173 received["msg2"] = msg
00174
00175 multi.subscribe(client1, cb1)
00176 multi.subscribe(client2, cb2)
00177 sleep(0.5)
00178 pub.publish(msg)
00179 sleep(0.5)
00180 self.assertEqual(msg.data, received["msg1"]["data"])
00181 self.assertEqual(msg.data, received["msg2"]["data"])
00182
00183
00184 PKG = 'rosbridge_library'
00185 NAME = 'test_multi_subscriber'
00186 if __name__ == '__main__':
00187 rostest.unitrun(PKG, NAME, TestMultiSubscriber)
00188