test_multi_subscriber.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG); roslib.load_manifest("std_msgs")
00004 import rospy
00005 from rospy import get_published_topics
00006 from rosgraph import Master
00007 
00008 from time import sleep, time
00009 
00010 from rosbridge_library.internal.subscribers import *
00011 from rosbridge_library.internal.topics import *
00012 from rosbridge_library.internal import ros_loader
00013 from rosbridge_library.internal.message_conversion import *
00014 from std_msgs.msg import String, Int32
00015 
00016 import unittest
00017 
00018 class TestMultiSubscriber(unittest.TestCase):
00019 
00020     def setUp(self):
00021         rospy.init_node("test_multi_subscriber")
00022 
00023     def is_topic_published(self, topicname):
00024         return topicname in dict(get_published_topics()).keys()
00025 
00026     def is_topic_subscribed(self, topicname):
00027         return topicname in dict(Master("test_multi_subscriber").getSystemState()[1])
00028 
00029     def test_register_multisubscriber(self):
00030         """ Register a subscriber on a clean topic with a good msg type """
00031         topic = "/test_register_multisubscriber"
00032         msg_type = "std_msgs/String"
00033 
00034         self.assertFalse(self.is_topic_subscribed(topic))
00035         MultiSubscriber(topic, msg_type)
00036         self.assertTrue(self.is_topic_subscribed(topic))
00037 
00038     def test_unregister_multisubscriber(self):
00039         """ Register and unregister a subscriber on a clean topic with a good msg type """
00040         topic = "/test_unregister_multisubscriber"
00041         msg_type = "std_msgs/String"
00042 
00043         self.assertFalse(self.is_topic_subscribed(topic))
00044         multi = MultiSubscriber(topic, msg_type)
00045         self.assertTrue(self.is_topic_subscribed(topic))
00046         multi.unregister()
00047         self.assertFalse(self.is_topic_subscribed(topic))
00048 
00049     def test_verify_type(self):
00050         topic = "/test_verify_type"
00051         msg_type = "std_msgs/String"
00052         othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00053         "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00054         "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
00055         "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00056         "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00057         "sensor_msgs/PointCloud2"]
00058 
00059         s = MultiSubscriber(topic, msg_type)
00060         s.verify_type(msg_type)
00061         for othertype in othertypes:
00062             self.assertRaises(TypeConflictException, s.verify_type, othertype)
00063 
00064     def test_subscribe_unsubscribe(self):
00065         topic = "/test_subscribe_unsubscribe"
00066         msg_type = "std_msgs/String"
00067         client = "client_test_subscribe_unsubscribe"
00068 
00069         self.assertFalse(self.is_topic_subscribed(topic))
00070         multi = MultiSubscriber(topic, msg_type)
00071         self.assertTrue(self.is_topic_subscribed(topic))
00072         self.assertFalse(multi.has_subscribers())
00073 
00074         multi.subscribe(client, None)
00075         self.assertTrue(multi.has_subscribers())
00076 
00077         multi.unsubscribe(client)
00078         self.assertFalse(multi.has_subscribers())
00079 
00080         multi.unregister()
00081         self.assertFalse(self.is_topic_subscribed(topic))
00082 
00083     def test_subscribe_receive_json(self):
00084         topic = "/test_subscribe_receive_json"
00085         msg_type = "std_msgs/String"
00086         client = "client_test_subscribe_receive_json"
00087 
00088         msg = String()
00089         msg.data = "dsajfadsufasdjf"
00090 
00091         pub = rospy.Publisher(topic, String)
00092         multi = MultiSubscriber(topic, msg_type)
00093 
00094         received = {"msg": None}
00095         
00096         def cb(msg):
00097             received["msg"] = msg
00098 
00099         multi.subscribe(client, cb)
00100         sleep(0.5)
00101         pub.publish(msg)
00102         sleep(0.5)
00103         self.assertEqual(msg.data, received["msg"]["data"])
00104 
00105     def test_subscribe_receive_json_multiple(self):
00106         topic = "/test_subscribe_receive_json_multiple"
00107         msg_type = "std_msgs/Int32"
00108         client = "client_test_subscribe_receive_json_multiple"
00109 
00110         numbers = range(100)
00111 
00112         pub = rospy.Publisher(topic, Int32)
00113         multi = MultiSubscriber(topic, msg_type)
00114 
00115         received = {"msgs": []}
00116 
00117         def cb(msg):
00118             received["msgs"].append(msg["data"])
00119 
00120         multi.subscribe(client, cb)
00121         sleep(0.5)
00122         for x in numbers:
00123             msg = Int32()
00124             msg.data = x
00125             pub.publish(msg)
00126         sleep(0.5)
00127         self.assertEqual(numbers, received["msgs"])
00128 
00129     def test_unsubscribe_does_not_receive_further_msgs(self):
00130         topic = "/test_unsubscribe_does_not_receive_further_msgs"
00131         msg_type = "std_msgs/String"
00132         client = "client_test_unsubscribe_does_not_receive_further_msgs"
00133 
00134         msg = String()
00135         msg.data = "dsajfadsufasdjf"
00136 
00137         pub = rospy.Publisher(topic, String)
00138         multi = MultiSubscriber(topic, msg_type)
00139 
00140         received = {"count": 0}
00141 
00142         def cb(msg):
00143             received["count"] = received["count"] + 1
00144 
00145         multi.subscribe(client, cb)
00146         sleep(0.5)
00147         pub.publish(msg)
00148         sleep(0.5)
00149         self.assertEqual(received["count"], 1)
00150         multi.unsubscribe(client)
00151         sleep(0.5)
00152         pub.publish(msg)
00153         sleep(0.5)
00154         self.assertEqual(received["count"], 1)
00155 
00156     def test_multiple_subscribers(self):
00157         topic = "/test_subscribe_receive_json"
00158         msg_type = "std_msgs/String"
00159         client1 = "client_test_subscribe_receive_json_1"
00160         client2 = "client_test_subscribe_receive_json_2"
00161 
00162         msg = String()
00163         msg.data = "dsajfadsufasdjf"
00164 
00165         pub = rospy.Publisher(topic, String)
00166         multi = MultiSubscriber(topic, msg_type)
00167 
00168         received = {"msg1": None, "msg2": None}
00169 
00170         def cb1(msg):
00171             received["msg1"] = msg
00172 
00173         def cb2(msg):
00174             received["msg2"] = msg
00175 
00176         multi.subscribe(client1, cb1)
00177         multi.subscribe(client2, cb2)
00178         sleep(0.5)
00179         pub.publish(msg)
00180         sleep(0.5)
00181         self.assertEqual(msg.data, received["msg1"]["data"])
00182         self.assertEqual(msg.data, received["msg2"]["data"])
00183 
00184 if __name__ == '__main__':
00185     import rostest
00186     rostest.rosrun(PKG, 'test_multi_subscriber', TestMultiSubscriber)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04