test_subscriber_manager.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG); roslib.load_manifest("std_msgs")
00004 import rospy
00005 from rosgraph import Master
00006 
00007 from time import sleep
00008 
00009 from rosbridge_library.internal.subscribers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00012 from std_msgs.msg import String, Int32
00013 
00014 import unittest
00015 
00016 
00017 class TestSubscriberManager(unittest.TestCase):
00018 
00019     def setUp(self):
00020         rospy.init_node("test_subscriber_manager")
00021 
00022     def is_topic_published(self, topicname):
00023         return topicname in dict(rospy.get_published_topics()).keys()
00024 
00025     def is_topic_subscribed(self, topicname):
00026         return topicname in dict(Master("test_subscriber_manager").getSystemState()[1])
00027 
00028     def test_subscribe(self):
00029         """ Register a publisher on a clean topic with a good msg type """
00030         topic = "/test_subscribe"
00031         msg_type = "std_msgs/String"
00032         client = "client_test_subscribe"
00033 
00034         self.assertFalse(topic in manager._subscribers)
00035         self.assertFalse(self.is_topic_subscribed(topic))
00036         manager.subscribe(client, topic, None, msg_type)
00037         self.assertTrue(topic in manager._subscribers)
00038         self.assertTrue(self.is_topic_subscribed(topic))
00039 
00040         manager.unsubscribe(client, topic)
00041         self.assertFalse(topic in manager._subscribers)
00042         self.assertFalse(self.is_topic_subscribed(topic))
00043 
00044     def test_register_subscriber_multiclient(self):
00045         topic = "/test_register_subscriber_multiclient"
00046         msg_type = "std_msgs/String"
00047         client1 = "client_test_register_subscriber_multiclient_1"
00048         client2 = "client_test_register_subscriber_multiclient_2"
00049 
00050         self.assertFalse(topic in manager._subscribers)
00051         self.assertFalse(self.is_topic_subscribed(topic))
00052         manager.subscribe(client1, topic, None, msg_type)
00053         self.assertTrue(topic in manager._subscribers)
00054         self.assertTrue(self.is_topic_subscribed(topic))
00055         manager.subscribe(client2, topic, None, msg_type)
00056         self.assertTrue(topic in manager._subscribers)
00057         self.assertTrue(self.is_topic_subscribed(topic))
00058         manager.unsubscribe(client1, topic)
00059         self.assertTrue(topic in manager._subscribers)
00060         self.assertTrue(self.is_topic_subscribed(topic))
00061         manager.unsubscribe(client2, topic)
00062         self.assertFalse(topic in manager._subscribers)
00063         self.assertFalse(self.is_topic_subscribed(topic))
00064 
00065     def test_register_publisher_conflicting_types(self):
00066         topic = "/test_register_publisher_conflicting_types"
00067         msg_type = "std_msgs/String"
00068         msg_type_bad = "std_msgs/Int32"
00069         client = "client_test_register_publisher_conflicting_types"
00070 
00071         self.assertFalse(topic in manager._subscribers)
00072         self.assertFalse(self.is_topic_subscribed(topic))
00073         manager.subscribe(client, topic, None, msg_type)
00074         self.assertTrue(topic in manager._subscribers)
00075         self.assertTrue(self.is_topic_subscribed(topic))
00076 
00077         self.assertRaises(TypeConflictException, manager.subscribe, "client2", topic, None, msg_type_bad)
00078 
00079     def test_register_multiple_publishers(self):
00080         topic1 = "/test_register_multiple_publishers1"
00081         topic2 = "/test_register_multiple_publishers2"
00082         msg_type = "std_msgs/String"
00083         client = "client_test_register_multiple_publishers"
00084 
00085         self.assertFalse(topic1 in manager._subscribers)
00086         self.assertFalse(topic2 in manager._subscribers)
00087         self.assertFalse(self.is_topic_subscribed(topic1))
00088         self.assertFalse(self.is_topic_subscribed(topic2))
00089         manager.subscribe(client, topic1, None, msg_type)
00090         self.assertTrue(topic1 in manager._subscribers)
00091         self.assertTrue(self.is_topic_subscribed(topic1))
00092         self.assertFalse(topic2 in manager._subscribers)
00093         self.assertFalse(self.is_topic_subscribed(topic2))
00094         manager.subscribe(client, topic2, None, msg_type)
00095         self.assertTrue(topic1 in manager._subscribers)
00096         self.assertTrue(self.is_topic_subscribed(topic1))
00097         self.assertTrue(topic2 in manager._subscribers)
00098         self.assertTrue(self.is_topic_subscribed(topic2))
00099 
00100         manager.unsubscribe(client, topic1)
00101         self.assertFalse(topic1 in manager._subscribers)
00102         self.assertFalse(self.is_topic_subscribed(topic1))
00103         self.assertTrue(topic2 in manager._subscribers)
00104         self.assertTrue(self.is_topic_subscribed(topic2))
00105 
00106         manager.unsubscribe(client, topic2)
00107         self.assertFalse(topic1 in manager._subscribers)
00108         self.assertFalse(self.is_topic_subscribed(topic1))
00109         self.assertFalse(topic2 in manager._subscribers)
00110         self.assertFalse(self.is_topic_subscribed(topic2))
00111 
00112     def test_register_no_msgtype(self):
00113         topic = "/test_register_no_msgtype"
00114         client = "client_test_register_no_msgtype"
00115 
00116         self.assertFalse(topic in manager._subscribers)
00117         self.assertFalse(self.is_topic_subscribed(topic))
00118         self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None)
00119 
00120     def test_register_infer_topictype(self):
00121         topic = "/test_register_infer_topictype"
00122         client = "client_test_register_infer_topictype"
00123 
00124         self.assertFalse(self.is_topic_subscribed(topic))
00125 
00126         rospy.Subscriber(topic, String, None)
00127 
00128         self.assertTrue(self.is_topic_subscribed(topic))
00129         self.assertFalse(topic in manager._subscribers)
00130         manager.subscribe(client, topic, None)
00131         self.assertTrue(topic in manager._subscribers)
00132         self.assertTrue(self.is_topic_subscribed(topic))
00133 
00134         manager.unsubscribe(client, topic)
00135         self.assertFalse(topic in manager._subscribers)
00136         self.assertTrue(self.is_topic_subscribed(topic))
00137 
00138     def test_register_multiple_notopictype(self):
00139         topic = "/test_register_multiple_notopictype"
00140         msg_type = "std_msgs/String"
00141         client1 = "client_test_register_multiple_notopictype_1"
00142         client2 = "client_test_register_multiple_notopictype_2"
00143 
00144         self.assertFalse(topic in manager._subscribers)
00145         self.assertFalse(self.is_topic_subscribed(topic))
00146         manager.subscribe(client1, topic, None, msg_type)
00147         self.assertTrue(topic in manager._subscribers)
00148         self.assertTrue(self.is_topic_subscribed(topic))
00149         manager.subscribe(client2, topic, None)
00150         self.assertTrue(topic in manager._subscribers)
00151         self.assertTrue(self.is_topic_subscribed(topic))
00152         manager.unsubscribe(client1, topic)
00153         self.assertTrue(topic in manager._subscribers)
00154         self.assertTrue(self.is_topic_subscribed(topic))
00155         manager.unsubscribe(client2, topic)
00156         self.assertFalse(topic in manager._subscribers)
00157         self.assertFalse(self.is_topic_subscribed(topic))
00158 
00159     def test_subscribe_not_registered(self):
00160         topic = "/test_subscribe_not_registered"
00161         client = "client_test_subscribe_not_registered"
00162 
00163         self.assertFalse(topic in manager._subscribers)
00164         self.assertFalse(self.is_topic_subscribed(topic))
00165         self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None)
00166 
00167     def test_publisher_manager_publish(self):
00168         topic = "/test_publisher_manager_publish"
00169         msg_type = "std_msgs/String"
00170         client = "client_test_publisher_manager_publish"
00171 
00172         msg = String()
00173         msg.data = "dsajfadsufasdjf"
00174 
00175         pub = rospy.Publisher(topic, String)
00176         received = {"msg": None}
00177 
00178         def cb(msg):
00179             received["msg"] = msg
00180 
00181         manager.subscribe(client, topic, cb, msg_type)
00182         sleep(0.5)
00183         pub.publish(msg)
00184         sleep(0.5)
00185         self.assertEqual(msg.data, received["msg"]["data"])
00186 
00187 
00188 if __name__ == '__main__':
00189     import rostest
00190     rostest.rosrun(PKG, 'test_subscriber_manager', TestSubscriberManager)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04