Go to the documentation of this file.00001
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG); roslib.load_manifest("std_msgs")
00004 import rospy
00005 from rospy import get_published_topics
00006 from rosgraph import Master
00007
00008 from time import sleep, time
00009
00010 from rosbridge_library.internal.subscribers import *
00011 from rosbridge_library.internal.topics import *
00012 from rosbridge_library.internal import ros_loader
00013 from rosbridge_library.internal.message_conversion import *
00014 from std_msgs.msg import String, Int32
00015
00016 import unittest
00017
00018 class TestMultiSubscriber(unittest.TestCase):
00019
00020 def setUp(self):
00021 rospy.init_node("test_multi_subscriber")
00022
00023 def is_topic_published(self, topicname):
00024 return topicname in dict(get_published_topics()).keys()
00025
00026 def is_topic_subscribed(self, topicname):
00027 return topicname in dict(Master("test_multi_subscriber").getSystemState()[1])
00028
00029 def test_register_multisubscriber(self):
00030 """ Register a subscriber on a clean topic with a good msg type """
00031 topic = "/test_register_multisubscriber"
00032 msg_type = "std_msgs/String"
00033
00034 self.assertFalse(self.is_topic_subscribed(topic))
00035 MultiSubscriber(topic, msg_type)
00036 self.assertTrue(self.is_topic_subscribed(topic))
00037
00038 def test_unregister_multisubscriber(self):
00039 """ Register and unregister a subscriber on a clean topic with a good msg type """
00040 topic = "/test_unregister_multisubscriber"
00041 msg_type = "std_msgs/String"
00042
00043 self.assertFalse(self.is_topic_subscribed(topic))
00044 multi = MultiSubscriber(topic, msg_type)
00045 self.assertTrue(self.is_topic_subscribed(topic))
00046 multi.unregister()
00047 self.assertFalse(self.is_topic_subscribed(topic))
00048
00049 def test_verify_type(self):
00050 topic = "/test_verify_type"
00051 msg_type = "std_msgs/String"
00052 othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00053 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00054 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
00055 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00056 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00057 "sensor_msgs/PointCloud2"]
00058
00059 s = MultiSubscriber(topic, msg_type)
00060 s.verify_type(msg_type)
00061 for othertype in othertypes:
00062 self.assertRaises(TypeConflictException, s.verify_type, othertype)
00063
00064 def test_subscribe_unsubscribe(self):
00065 topic = "/test_subscribe_unsubscribe"
00066 msg_type = "std_msgs/String"
00067 client = "client_test_subscribe_unsubscribe"
00068
00069 self.assertFalse(self.is_topic_subscribed(topic))
00070 multi = MultiSubscriber(topic, msg_type)
00071 self.assertTrue(self.is_topic_subscribed(topic))
00072 self.assertFalse(multi.has_subscribers())
00073
00074 multi.subscribe(client, None)
00075 self.assertTrue(multi.has_subscribers())
00076
00077 multi.unsubscribe(client)
00078 self.assertFalse(multi.has_subscribers())
00079
00080 multi.unregister()
00081 self.assertFalse(self.is_topic_subscribed(topic))
00082
00083 def test_subscribe_receive_json(self):
00084 topic = "/test_subscribe_receive_json"
00085 msg_type = "std_msgs/String"
00086 client = "client_test_subscribe_receive_json"
00087
00088 msg = String()
00089 msg.data = "dsajfadsufasdjf"
00090
00091 pub = rospy.Publisher(topic, String)
00092 multi = MultiSubscriber(topic, msg_type)
00093
00094 received = {"msg": None}
00095
00096 def cb(msg):
00097 received["msg"] = msg
00098
00099 multi.subscribe(client, cb)
00100 sleep(0.5)
00101 pub.publish(msg)
00102 sleep(0.5)
00103 self.assertEqual(msg.data, received["msg"]["data"])
00104
00105 def test_subscribe_receive_json_multiple(self):
00106 topic = "/test_subscribe_receive_json_multiple"
00107 msg_type = "std_msgs/Int32"
00108 client = "client_test_subscribe_receive_json_multiple"
00109
00110 numbers = range(100)
00111
00112 pub = rospy.Publisher(topic, Int32)
00113 multi = MultiSubscriber(topic, msg_type)
00114
00115 received = {"msgs": []}
00116
00117 def cb(msg):
00118 received["msgs"].append(msg["data"])
00119
00120 multi.subscribe(client, cb)
00121 sleep(0.5)
00122 for x in numbers:
00123 msg = Int32()
00124 msg.data = x
00125 pub.publish(msg)
00126 sleep(0.5)
00127 self.assertEqual(numbers, received["msgs"])
00128
00129 def test_unsubscribe_does_not_receive_further_msgs(self):
00130 topic = "/test_unsubscribe_does_not_receive_further_msgs"
00131 msg_type = "std_msgs/String"
00132 client = "client_test_unsubscribe_does_not_receive_further_msgs"
00133
00134 msg = String()
00135 msg.data = "dsajfadsufasdjf"
00136
00137 pub = rospy.Publisher(topic, String)
00138 multi = MultiSubscriber(topic, msg_type)
00139
00140 received = {"count": 0}
00141
00142 def cb(msg):
00143 received["count"] = received["count"] + 1
00144
00145 multi.subscribe(client, cb)
00146 sleep(0.5)
00147 pub.publish(msg)
00148 sleep(0.5)
00149 self.assertEqual(received["count"], 1)
00150 multi.unsubscribe(client)
00151 sleep(0.5)
00152 pub.publish(msg)
00153 sleep(0.5)
00154 self.assertEqual(received["count"], 1)
00155
00156 def test_multiple_subscribers(self):
00157 topic = "/test_subscribe_receive_json"
00158 msg_type = "std_msgs/String"
00159 client1 = "client_test_subscribe_receive_json_1"
00160 client2 = "client_test_subscribe_receive_json_2"
00161
00162 msg = String()
00163 msg.data = "dsajfadsufasdjf"
00164
00165 pub = rospy.Publisher(topic, String)
00166 multi = MultiSubscriber(topic, msg_type)
00167
00168 received = {"msg1": None, "msg2": None}
00169
00170 def cb1(msg):
00171 received["msg1"] = msg
00172
00173 def cb2(msg):
00174 received["msg2"] = msg
00175
00176 multi.subscribe(client1, cb1)
00177 multi.subscribe(client2, cb2)
00178 sleep(0.5)
00179 pub.publish(msg)
00180 sleep(0.5)
00181 self.assertEqual(msg.data, received["msg1"]["data"])
00182 self.assertEqual(msg.data, received["msg2"]["data"])
00183
00184 if __name__ == '__main__':
00185 import rostest
00186 rostest.rosrun(PKG, 'test_multi_subscriber', TestMultiSubscriber)