test_multi_subscriber.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 from rosgraph import Master
00007 
00008 from time import sleep, time
00009 
00010 from rosbridge_library.internal.subscribers import *
00011 from rosbridge_library.internal.topics import *
00012 from rosbridge_library.internal import ros_loader
00013 from rosbridge_library.internal.message_conversion import *
00014 from std_msgs.msg import String, Int32
00015 
00016 
00017 class TestMultiSubscriber(unittest.TestCase):
00018 
00019     def setUp(self):
00020         rospy.init_node("test_multi_subscriber")
00021 
00022     def is_topic_published(self, topicname):
00023         return topicname in dict(rospy.get_published_topics()).keys()
00024 
00025     def is_topic_subscribed(self, topicname):
00026         return topicname in dict(Master("test_multi_subscriber").getSystemState()[1])
00027 
00028     def test_register_multisubscriber(self):
00029         """ Register a subscriber on a clean topic with a good msg type """
00030         topic = "/test_register_multisubscriber"
00031         msg_type = "std_msgs/String"
00032 
00033         self.assertFalse(self.is_topic_subscribed(topic))
00034         MultiSubscriber(topic, msg_type)
00035         self.assertTrue(self.is_topic_subscribed(topic))
00036 
00037     def test_unregister_multisubscriber(self):
00038         """ Register and unregister a subscriber on a clean topic with a good msg type """
00039         topic = "/test_unregister_multisubscriber"
00040         msg_type = "std_msgs/String"
00041 
00042         self.assertFalse(self.is_topic_subscribed(topic))
00043         multi = MultiSubscriber(topic, msg_type)
00044         self.assertTrue(self.is_topic_subscribed(topic))
00045         multi.unregister()
00046         self.assertFalse(self.is_topic_subscribed(topic))
00047 
00048     def test_verify_type(self):
00049         topic = "/test_verify_type"
00050         msg_type = "std_msgs/String"
00051         othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00052         "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00053         "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
00054         "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00055         "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00056         "sensor_msgs/PointCloud2"]
00057 
00058         s = MultiSubscriber(topic, msg_type)
00059         s.verify_type(msg_type)
00060         for othertype in othertypes:
00061             self.assertRaises(TypeConflictException, s.verify_type, othertype)
00062 
00063     def test_subscribe_unsubscribe(self):
00064         topic = "/test_subscribe_unsubscribe"
00065         msg_type = "std_msgs/String"
00066         client = "client_test_subscribe_unsubscribe"
00067 
00068         self.assertFalse(self.is_topic_subscribed(topic))
00069         multi = MultiSubscriber(topic, msg_type)
00070         self.assertTrue(self.is_topic_subscribed(topic))
00071         self.assertFalse(multi.has_subscribers())
00072 
00073         multi.subscribe(client, None)
00074         self.assertTrue(multi.has_subscribers())
00075 
00076         multi.unsubscribe(client)
00077         self.assertFalse(multi.has_subscribers())
00078 
00079         multi.unregister()
00080         self.assertFalse(self.is_topic_subscribed(topic))
00081 
00082     def test_subscribe_receive_json(self):
00083         topic = "/test_subscribe_receive_json"
00084         msg_type = "std_msgs/String"
00085         client = "client_test_subscribe_receive_json"
00086 
00087         msg = String()
00088         msg.data = "dsajfadsufasdjf"
00089 
00090         pub = rospy.Publisher(topic, String)
00091         multi = MultiSubscriber(topic, msg_type)
00092 
00093         received = {"msg": None}
00094 
00095         def cb(msg):
00096             received["msg"] = msg
00097 
00098         multi.subscribe(client, cb)
00099         sleep(0.5)
00100         pub.publish(msg)
00101         sleep(0.5)
00102         self.assertEqual(msg.data, received["msg"]["data"])
00103 
00104     def test_subscribe_receive_json_multiple(self):
00105         topic = "/test_subscribe_receive_json_multiple"
00106         msg_type = "std_msgs/Int32"
00107         client = "client_test_subscribe_receive_json_multiple"
00108 
00109         numbers = list(range(100))
00110 
00111         pub = rospy.Publisher(topic, Int32)
00112         multi = MultiSubscriber(topic, msg_type)
00113 
00114         received = {"msgs": []}
00115 
00116         def cb(msg):
00117             received["msgs"].append(msg["data"])
00118 
00119         multi.subscribe(client, cb)
00120         sleep(0.5)
00121         for x in numbers:
00122             msg = Int32()
00123             msg.data = x
00124             pub.publish(msg)
00125         sleep(0.5)
00126         self.assertEqual(numbers, received["msgs"])
00127 
00128     def test_unsubscribe_does_not_receive_further_msgs(self):
00129         topic = "/test_unsubscribe_does_not_receive_further_msgs"
00130         msg_type = "std_msgs/String"
00131         client = "client_test_unsubscribe_does_not_receive_further_msgs"
00132 
00133         msg = String()
00134         msg.data = "dsajfadsufasdjf"
00135 
00136         pub = rospy.Publisher(topic, String)
00137         multi = MultiSubscriber(topic, msg_type)
00138 
00139         received = {"count": 0}
00140 
00141         def cb(msg):
00142             received["count"] = received["count"] + 1
00143 
00144         multi.subscribe(client, cb)
00145         sleep(0.5)
00146         pub.publish(msg)
00147         sleep(0.5)
00148         self.assertEqual(received["count"], 1)
00149         multi.unsubscribe(client)
00150         sleep(0.5)
00151         pub.publish(msg)
00152         sleep(0.5)
00153         self.assertEqual(received["count"], 1)
00154 
00155     def test_multiple_subscribers(self):
00156         topic = "/test_subscribe_receive_json"
00157         msg_type = "std_msgs/String"
00158         client1 = "client_test_subscribe_receive_json_1"
00159         client2 = "client_test_subscribe_receive_json_2"
00160 
00161         msg = String()
00162         msg.data = "dsajfadsufasdjf"
00163 
00164         pub = rospy.Publisher(topic, String)
00165         multi = MultiSubscriber(topic, msg_type)
00166 
00167         received = {"msg1": None, "msg2": None}
00168 
00169         def cb1(msg):
00170             received["msg1"] = msg
00171 
00172         def cb2(msg):
00173             received["msg2"] = msg
00174 
00175         multi.subscribe(client1, cb1)
00176         multi.subscribe(client2, cb2)
00177         sleep(0.5)
00178         pub.publish(msg)
00179         sleep(0.5)
00180         self.assertEqual(msg.data, received["msg1"]["data"])
00181         self.assertEqual(msg.data, received["msg2"]["data"])
00182 
00183 
00184 PKG = 'rosbridge_library'
00185 NAME = 'test_multi_subscriber'
00186 if __name__ == '__main__':
00187     rostest.unitrun(PKG, NAME, TestMultiSubscriber)
00188 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43