test_subscriber_manager.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 from rosgraph import Master
00007 
00008 from time import sleep
00009 
00010 from rosbridge_library.internal.subscribers import *
00011 from rosbridge_library.internal.topics import *
00012 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00013 from std_msgs.msg import String, Int32
00014 
00015 
00016 class TestSubscriberManager(unittest.TestCase):
00017 
00018     def setUp(self):
00019         rospy.init_node("test_subscriber_manager")
00020 
00021     def is_topic_published(self, topicname):
00022         return topicname in dict(rospy.get_published_topics()).keys()
00023 
00024     def is_topic_subscribed(self, topicname):
00025         return topicname in dict(Master("test_subscriber_manager").getSystemState()[1])
00026 
00027     def test_subscribe(self):
00028         """ Register a publisher on a clean topic with a good msg type """
00029         topic = "/test_subscribe"
00030         msg_type = "std_msgs/String"
00031         client = "client_test_subscribe"
00032 
00033         self.assertFalse(topic in manager._subscribers)
00034         self.assertFalse(self.is_topic_subscribed(topic))
00035         manager.subscribe(client, topic, None, msg_type)
00036         self.assertTrue(topic in manager._subscribers)
00037         self.assertTrue(self.is_topic_subscribed(topic))
00038 
00039         manager.unsubscribe(client, topic)
00040         self.assertFalse(topic in manager._subscribers)
00041         self.assertFalse(self.is_topic_subscribed(topic))
00042 
00043     def test_register_subscriber_multiclient(self):
00044         topic = "/test_register_subscriber_multiclient"
00045         msg_type = "std_msgs/String"
00046         client1 = "client_test_register_subscriber_multiclient_1"
00047         client2 = "client_test_register_subscriber_multiclient_2"
00048 
00049         self.assertFalse(topic in manager._subscribers)
00050         self.assertFalse(self.is_topic_subscribed(topic))
00051         manager.subscribe(client1, topic, None, msg_type)
00052         self.assertTrue(topic in manager._subscribers)
00053         self.assertTrue(self.is_topic_subscribed(topic))
00054         manager.subscribe(client2, topic, None, msg_type)
00055         self.assertTrue(topic in manager._subscribers)
00056         self.assertTrue(self.is_topic_subscribed(topic))
00057         manager.unsubscribe(client1, topic)
00058         self.assertTrue(topic in manager._subscribers)
00059         self.assertTrue(self.is_topic_subscribed(topic))
00060         manager.unsubscribe(client2, topic)
00061         self.assertFalse(topic in manager._subscribers)
00062         self.assertFalse(self.is_topic_subscribed(topic))
00063 
00064     def test_register_publisher_conflicting_types(self):
00065         topic = "/test_register_publisher_conflicting_types"
00066         msg_type = "std_msgs/String"
00067         msg_type_bad = "std_msgs/Int32"
00068         client = "client_test_register_publisher_conflicting_types"
00069 
00070         self.assertFalse(topic in manager._subscribers)
00071         self.assertFalse(self.is_topic_subscribed(topic))
00072         manager.subscribe(client, topic, None, msg_type)
00073         self.assertTrue(topic in manager._subscribers)
00074         self.assertTrue(self.is_topic_subscribed(topic))
00075 
00076         self.assertRaises(TypeConflictException, manager.subscribe, "client2", topic, None, msg_type_bad)
00077 
00078     def test_register_multiple_publishers(self):
00079         topic1 = "/test_register_multiple_publishers1"
00080         topic2 = "/test_register_multiple_publishers2"
00081         msg_type = "std_msgs/String"
00082         client = "client_test_register_multiple_publishers"
00083 
00084         self.assertFalse(topic1 in manager._subscribers)
00085         self.assertFalse(topic2 in manager._subscribers)
00086         self.assertFalse(self.is_topic_subscribed(topic1))
00087         self.assertFalse(self.is_topic_subscribed(topic2))
00088         manager.subscribe(client, topic1, None, msg_type)
00089         self.assertTrue(topic1 in manager._subscribers)
00090         self.assertTrue(self.is_topic_subscribed(topic1))
00091         self.assertFalse(topic2 in manager._subscribers)
00092         self.assertFalse(self.is_topic_subscribed(topic2))
00093         manager.subscribe(client, topic2, None, msg_type)
00094         self.assertTrue(topic1 in manager._subscribers)
00095         self.assertTrue(self.is_topic_subscribed(topic1))
00096         self.assertTrue(topic2 in manager._subscribers)
00097         self.assertTrue(self.is_topic_subscribed(topic2))
00098 
00099         manager.unsubscribe(client, topic1)
00100         self.assertFalse(topic1 in manager._subscribers)
00101         self.assertFalse(self.is_topic_subscribed(topic1))
00102         self.assertTrue(topic2 in manager._subscribers)
00103         self.assertTrue(self.is_topic_subscribed(topic2))
00104 
00105         manager.unsubscribe(client, topic2)
00106         self.assertFalse(topic1 in manager._subscribers)
00107         self.assertFalse(self.is_topic_subscribed(topic1))
00108         self.assertFalse(topic2 in manager._subscribers)
00109         self.assertFalse(self.is_topic_subscribed(topic2))
00110 
00111     def test_register_no_msgtype(self):
00112         topic = "/test_register_no_msgtype"
00113         client = "client_test_register_no_msgtype"
00114 
00115         self.assertFalse(topic in manager._subscribers)
00116         self.assertFalse(self.is_topic_subscribed(topic))
00117         self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None)
00118 
00119     def test_register_infer_topictype(self):
00120         topic = "/test_register_infer_topictype"
00121         client = "client_test_register_infer_topictype"
00122 
00123         self.assertFalse(self.is_topic_subscribed(topic))
00124 
00125         rospy.Subscriber(topic, String, None)
00126 
00127         self.assertTrue(self.is_topic_subscribed(topic))
00128         self.assertFalse(topic in manager._subscribers)
00129         manager.subscribe(client, topic, None)
00130         self.assertTrue(topic in manager._subscribers)
00131         self.assertTrue(self.is_topic_subscribed(topic))
00132 
00133         manager.unsubscribe(client, topic)
00134         self.assertFalse(topic in manager._subscribers)
00135         self.assertTrue(self.is_topic_subscribed(topic))
00136 
00137     def test_register_multiple_notopictype(self):
00138         topic = "/test_register_multiple_notopictype"
00139         msg_type = "std_msgs/String"
00140         client1 = "client_test_register_multiple_notopictype_1"
00141         client2 = "client_test_register_multiple_notopictype_2"
00142 
00143         self.assertFalse(topic in manager._subscribers)
00144         self.assertFalse(self.is_topic_subscribed(topic))
00145         manager.subscribe(client1, topic, None, msg_type)
00146         self.assertTrue(topic in manager._subscribers)
00147         self.assertTrue(self.is_topic_subscribed(topic))
00148         manager.subscribe(client2, topic, None)
00149         self.assertTrue(topic in manager._subscribers)
00150         self.assertTrue(self.is_topic_subscribed(topic))
00151         manager.unsubscribe(client1, topic)
00152         self.assertTrue(topic in manager._subscribers)
00153         self.assertTrue(self.is_topic_subscribed(topic))
00154         manager.unsubscribe(client2, topic)
00155         self.assertFalse(topic in manager._subscribers)
00156         self.assertFalse(self.is_topic_subscribed(topic))
00157 
00158     def test_subscribe_not_registered(self):
00159         topic = "/test_subscribe_not_registered"
00160         client = "client_test_subscribe_not_registered"
00161 
00162         self.assertFalse(topic in manager._subscribers)
00163         self.assertFalse(self.is_topic_subscribed(topic))
00164         self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None)
00165 
00166     def test_publisher_manager_publish(self):
00167         topic = "/test_publisher_manager_publish"
00168         msg_type = "std_msgs/String"
00169         client = "client_test_publisher_manager_publish"
00170 
00171         msg = String()
00172         msg.data = "dsajfadsufasdjf"
00173 
00174         pub = rospy.Publisher(topic, String)
00175         received = {"msg": None}
00176 
00177         def cb(msg):
00178             received["msg"] = msg
00179 
00180         manager.subscribe(client, topic, cb, msg_type)
00181         sleep(0.5)
00182         pub.publish(msg)
00183         sleep(0.5)
00184         self.assertEqual(msg.data, received["msg"]["data"])
00185 
00186 
00187 PKG = 'rosbridge_library'
00188 NAME = 'test_subscriber_manager'
00189 if __name__ == '__main__':
00190     rostest.unitrun(PKG, NAME, TestSubscriberManager)
00191 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43