test_publisher_manager.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 
00007 from time import sleep
00008 
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00012 from std_msgs.msg import String, Int32
00013 
00014 
00015 class TestPublisherManager(unittest.TestCase):
00016 
00017     def setUp(self):
00018         rospy.init_node("test_publisher_manager")
00019 
00020     def is_topic_published(self, topicname):
00021         return topicname in dict(rospy.get_published_topics()).keys()
00022 
00023     def test_register_publisher(self):
00024         """ Register a publisher on a clean topic with a good msg type """
00025         topic = "/test_register_publisher"
00026         msg_type = "std_msgs/String"
00027         client = "client_test_register_publisher"
00028 
00029         self.assertFalse(topic in manager._publishers)
00030         self.assertFalse(self.is_topic_published(topic))
00031         manager.register(client, topic, msg_type)
00032         self.assertTrue(topic in manager._publishers)
00033         self.assertTrue(self.is_topic_published(topic))
00034 
00035         manager.unregister(client, topic)
00036         self.assertFalse(topic in manager._publishers)
00037         self.assertFalse(self.is_topic_published(topic))
00038 
00039     def test_register_publisher_multiclient(self):
00040         topic = "/test_register_publisher_multiclient"
00041         msg_type = "std_msgs/String"
00042         client1 = "client_test_register_publisher_1"
00043         client2 = "client_test_register_publisher_2"
00044 
00045         self.assertFalse(topic in manager._publishers)
00046         self.assertFalse(self.is_topic_published(topic))
00047         manager.register(client1, topic, msg_type)
00048         self.assertTrue(topic in manager._publishers)
00049         self.assertTrue(self.is_topic_published(topic))
00050         manager.register(client2, topic, msg_type)
00051         self.assertTrue(topic in manager._publishers)
00052         self.assertTrue(self.is_topic_published(topic))
00053         manager.unregister(client1, topic)
00054         self.assertTrue(topic in manager._publishers)
00055         self.assertTrue(self.is_topic_published(topic))
00056         manager.unregister(client2, topic)
00057         self.assertFalse(topic in manager._publishers)
00058         self.assertFalse(self.is_topic_published(topic))
00059 
00060     def test_register_publisher_conflicting_types(self):
00061         topic = "/test_register_publisher_conflicting_types"
00062         msg_type = "std_msgs/String"
00063         msg_type_bad = "std_msgs/Int32"
00064         client = "client_test_register_publisher_conflicting_types"
00065 
00066         self.assertFalse(topic in manager._publishers)
00067         self.assertFalse(self.is_topic_published(topic))
00068         manager.register(client, topic, msg_type)
00069         self.assertTrue(topic in manager._publishers)
00070         self.assertTrue(self.is_topic_published(topic))
00071 
00072         self.assertRaises(TypeConflictException, manager.register, "client2", topic, msg_type_bad)
00073 
00074     def test_register_multiple_publishers(self):
00075         topic1 = "/test_register_multiple_publishers1"
00076         topic2 = "/test_register_multiple_publishers2"
00077         msg_type = "std_msgs/String"
00078         client = "client_test_register_multiple_publishers"
00079 
00080         self.assertFalse(topic1 in manager._publishers)
00081         self.assertFalse(topic2 in manager._publishers)
00082         self.assertFalse(self.is_topic_published(topic1))
00083         self.assertFalse(self.is_topic_published(topic2))
00084         manager.register(client, topic1, msg_type)
00085         self.assertTrue(topic1 in manager._publishers)
00086         self.assertTrue(self.is_topic_published(topic1))
00087         self.assertFalse(topic2 in manager._publishers)
00088         self.assertFalse(self.is_topic_published(topic2))
00089         manager.register(client, topic2, msg_type)
00090         self.assertTrue(topic1 in manager._publishers)
00091         self.assertTrue(self.is_topic_published(topic1))
00092         self.assertTrue(topic2 in manager._publishers)
00093         self.assertTrue(self.is_topic_published(topic2))
00094 
00095         manager.unregister(client, topic1)
00096         self.assertFalse(topic1 in manager._publishers)
00097         self.assertFalse(self.is_topic_published(topic1))
00098         self.assertTrue(topic2 in manager._publishers)
00099         self.assertTrue(self.is_topic_published(topic2))
00100 
00101         manager.unregister(client, topic2)
00102         self.assertFalse(topic1 in manager._publishers)
00103         self.assertFalse(self.is_topic_published(topic1))
00104         self.assertFalse(topic2 in manager._publishers)
00105         self.assertFalse(self.is_topic_published(topic2))
00106 
00107     def test_register_no_msgtype(self):
00108         topic = "/test_register_no_msgtype"
00109         client = "client_test_register_no_msgtype"
00110 
00111         self.assertFalse(topic in manager._publishers)
00112         self.assertFalse(self.is_topic_published(topic))
00113         self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
00114 
00115     def test_register_infer_topictype(self):
00116         topic = "/test_register_infer_topictype"
00117         client = "client_test_register_infer_topictype"
00118 
00119         self.assertFalse(self.is_topic_published(topic))
00120 
00121         rospy.Publisher(topic, String)
00122 
00123         self.assertTrue(self.is_topic_published(topic))
00124         self.assertFalse(topic in manager._publishers)
00125         manager.register(client, topic)
00126         self.assertTrue(topic in manager._publishers)
00127         self.assertTrue(self.is_topic_published(topic))
00128 
00129         manager.unregister(client, topic)
00130         self.assertFalse(topic in manager._publishers)
00131         self.assertTrue(self.is_topic_published(topic))
00132 
00133     def test_register_multiple_notopictype(self):
00134         topic = "/test_register_multiple_notopictype"
00135         msg_type = "std_msgs/String"
00136         client1 = "client_test_register_multiple_notopictype_1"
00137         client2 = "client_test_register_multiple_notopictype_2"
00138 
00139         self.assertFalse(topic in manager._publishers)
00140         self.assertFalse(self.is_topic_published(topic))
00141         manager.register(client1, topic, msg_type)
00142         self.assertTrue(topic in manager._publishers)
00143         self.assertTrue(self.is_topic_published(topic))
00144         manager.register(client2, topic)
00145         self.assertTrue(topic in manager._publishers)
00146         self.assertTrue(self.is_topic_published(topic))
00147         manager.unregister(client1, topic)
00148         self.assertTrue(topic in manager._publishers)
00149         self.assertTrue(self.is_topic_published(topic))
00150         manager.unregister(client2, topic)
00151         self.assertFalse(topic in manager._publishers)
00152         self.assertFalse(self.is_topic_published(topic))
00153 
00154     def test_publish_not_registered(self):
00155         topic = "/test_publish_not_registered"
00156         msg = {"data": "test publish not registered"}
00157         client = "client_test_publish_not_registered"
00158 
00159         self.assertFalse(topic in manager._publishers)
00160         self.assertFalse(self.is_topic_published(topic))
00161         self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
00162 
00163     def test_publisher_manager_publish(self):
00164         """ Make sure that publishing works """
00165         topic = "/test_publisher_manager_publish"
00166         msg = {"data": "test publisher manager publish"}
00167         client = "client_test_publisher_manager_publish"
00168 
00169         received = {"msg": None}
00170 
00171         def cb(msg):
00172             received["msg"] = msg
00173 
00174         rospy.Subscriber(topic, String, cb)
00175         manager.publish(client, topic, msg)
00176         sleep(0.5)
00177 
00178         self.assertEqual(received["msg"].data, msg["data"])
00179 
00180     def test_publisher_manager_bad_publish(self):
00181         """ Make sure that bad publishing fails """
00182         topic = "/test_publisher_manager_bad_publish"
00183         client = "client_test_publisher_manager_bad_publish"
00184         msg_type = "std_msgs/String"
00185         msg = {"data": 3}
00186 
00187         manager.register(client, topic, msg_type)
00188         self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
00189 
00190 
00191 PKG = 'rosbridge_library'
00192 NAME = 'test_publisher_manager'
00193 if __name__ == '__main__':
00194     rostest.unitrun(PKG, NAME, TestPublisherManager)
00195 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Aug 27 2015 14:50:35