test_publisher_manager.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 
00007 from time import sleep
00008 
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00012 from std_msgs.msg import String, Int32
00013 
00014 
00015 class TestPublisherManager(unittest.TestCase):
00016 
00017     def setUp(self):
00018         rospy.init_node("test_publisher_manager")
00019         UNREGISTER_TIMEOUT = 1.0
00020 
00021     def is_topic_published(self, topicname):
00022         return topicname in dict(rospy.get_published_topics()).keys()
00023 
00024     def test_register_publisher(self):
00025         """ Register a publisher on a clean topic with a good msg type """
00026         topic = "/test_register_publisher"
00027         msg_type = "std_msgs/String"
00028         client = "client_test_register_publisher"
00029 
00030         self.assertFalse(topic in manager._publishers)
00031         self.assertFalse(self.is_topic_published(topic))
00032         manager.register(client, topic, msg_type)
00033         self.assertTrue(topic in manager._publishers)
00034         self.assertTrue(self.is_topic_published(topic))
00035 
00036         manager.unregister(client, topic)
00037         self.assertTrue(topic in manager.unregister_timers)
00038         self.assertTrue(topic in manager._publishers)
00039         self.assertTrue(self.is_topic_published(topic))
00040         sleep(UNREGISTER_TIMEOUT*1.1)
00041         self.assertFalse(topic in manager._publishers)
00042         self.assertFalse(self.is_topic_published(topic))
00043         self.assertFalse(topic in manager.unregister_timers)
00044 
00045     def test_register_publisher_multiclient(self):
00046         topic = "/test_register_publisher_multiclient"
00047         msg_type = "std_msgs/String"
00048         client1 = "client_test_register_publisher_1"
00049         client2 = "client_test_register_publisher_2"
00050 
00051         self.assertFalse(topic in manager._publishers)
00052         self.assertFalse(self.is_topic_published(topic))
00053         manager.register(client1, topic, msg_type)
00054         self.assertTrue(topic in manager._publishers)
00055         self.assertTrue(self.is_topic_published(topic))
00056         manager.register(client2, topic, msg_type)
00057         self.assertTrue(topic in manager._publishers)
00058         self.assertTrue(self.is_topic_published(topic))
00059         manager.unregister(client1, topic)
00060         self.assertTrue(topic in manager._publishers)
00061         self.assertTrue(self.is_topic_published(topic))
00062         manager.unregister(client2, topic)
00063         self.assertTrue(topic in manager.unregister_timers)
00064         self.assertTrue(topic in manager._publishers)
00065         self.assertTrue(self.is_topic_published(topic))
00066         sleep(UNREGISTER_TIMEOUT*1.1)
00067         self.assertFalse(topic in manager._publishers)
00068         self.assertFalse(self.is_topic_published(topic))
00069         self.assertFalse(topic in manager.unregister_timers)
00070 
00071     def test_register_publisher_conflicting_types(self):
00072         topic = "/test_register_publisher_conflicting_types"
00073         msg_type = "std_msgs/String"
00074         msg_type_bad = "std_msgs/Int32"
00075         client = "client_test_register_publisher_conflicting_types"
00076 
00077         self.assertFalse(topic in manager._publishers)
00078         self.assertFalse(self.is_topic_published(topic))
00079         manager.register(client, topic, msg_type)
00080         self.assertTrue(topic in manager._publishers)
00081         self.assertTrue(self.is_topic_published(topic))
00082 
00083         self.assertRaises(TypeConflictException, manager.register, "client2", topic, msg_type_bad)
00084 
00085     def test_register_multiple_publishers(self):
00086         topic1 = "/test_register_multiple_publishers1"
00087         topic2 = "/test_register_multiple_publishers2"
00088         msg_type = "std_msgs/String"
00089         client = "client_test_register_multiple_publishers"
00090 
00091         self.assertFalse(topic1 in manager._publishers)
00092         self.assertFalse(topic2 in manager._publishers)
00093         self.assertFalse(self.is_topic_published(topic1))
00094         self.assertFalse(self.is_topic_published(topic2))
00095         manager.register(client, topic1, msg_type)
00096         self.assertTrue(topic1 in manager._publishers)
00097         self.assertTrue(self.is_topic_published(topic1))
00098         self.assertFalse(topic2 in manager._publishers)
00099         self.assertFalse(self.is_topic_published(topic2))
00100         manager.register(client, topic2, msg_type)
00101         self.assertTrue(topic1 in manager._publishers)
00102         self.assertTrue(self.is_topic_published(topic1))
00103         self.assertTrue(topic2 in manager._publishers)
00104         self.assertTrue(self.is_topic_published(topic2))
00105 
00106         manager.unregister(client, topic1)
00107         self.assertTrue(self.is_topic_published(topic1))
00108         self.assertTrue(topic1 in manager.unregister_timers)
00109         self.assertTrue(topic2 in manager._publishers)
00110         self.assertTrue(self.is_topic_published(topic2))
00111 
00112         manager.unregister(client, topic2)
00113         self.assertTrue(topic2 in manager.unregister_timers)
00114         self.assertTrue(self.is_topic_published(topic2))
00115         sleep(UNREGISTER_TIMEOUT*1.1)
00116         self.assertFalse(topic1 in manager._publishers)
00117         self.assertFalse(self.is_topic_published(topic1))
00118         self.assertFalse(topic2 in manager._publishers)
00119         self.assertFalse(self.is_topic_published(topic2))
00120         self.assertFalse(topic1 in manager.unregister_timers)
00121         self.assertFalse(topic2 in manager.unregister_timers)
00122 
00123     def test_register_no_msgtype(self):
00124         topic = "/test_register_no_msgtype"
00125         client = "client_test_register_no_msgtype"
00126 
00127         self.assertFalse(topic in manager._publishers)
00128         self.assertFalse(self.is_topic_published(topic))
00129         self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
00130 
00131     def test_register_infer_topictype(self):
00132         topic = "/test_register_infer_topictype"
00133         client = "client_test_register_infer_topictype"
00134 
00135         self.assertFalse(self.is_topic_published(topic))
00136 
00137         rospy.Publisher(topic, String)
00138 
00139         self.assertTrue(self.is_topic_published(topic))
00140         self.assertFalse(topic in manager._publishers)
00141         manager.register(client, topic)
00142         self.assertTrue(topic in manager._publishers)
00143         self.assertTrue(self.is_topic_published(topic))
00144 
00145         manager.unregister(client, topic)
00146         self.assertTrue(topic in manager.unregister_timers)
00147         self.assertTrue(self.is_topic_published(topic))
00148 
00149     def test_register_multiple_notopictype(self):
00150         topic = "/test_register_multiple_notopictype"
00151         msg_type = "std_msgs/String"
00152         client1 = "client_test_register_multiple_notopictype_1"
00153         client2 = "client_test_register_multiple_notopictype_2"
00154 
00155         self.assertFalse(topic in manager._publishers)
00156         self.assertFalse(topic in manager.unregister_timers)
00157         self.assertFalse(self.is_topic_published(topic))
00158         manager.register(client1, topic, msg_type)
00159         self.assertTrue(topic in manager._publishers)
00160         self.assertTrue(self.is_topic_published(topic))
00161         manager.register(client2, topic)
00162         self.assertTrue(topic in manager._publishers)
00163         self.assertTrue(self.is_topic_published(topic))
00164         manager.unregister(client1, topic)
00165         self.assertTrue(topic in manager._publishers)
00166         self.assertTrue(topic in manager.unregister_timers)
00167         self.assertTrue(self.is_topic_published(topic))
00168         manager.unregister(client2, topic)
00169         self.assertTrue(topic in manager.unregister_timers)
00170         self.assertTrue(topic in manager._publishers)
00171         sleep(UNREGISTER_TIMEOUT*1.1)
00172         self.assertFalse(topic in manager._publishers)
00173         self.assertFalse(topic in manager.unregister_timers)
00174         self.assertFalse(self.is_topic_published(topic))
00175 
00176     def test_publish_not_registered(self):
00177         topic = "/test_publish_not_registered"
00178         msg = {"data": "test publish not registered"}
00179         client = "client_test_publish_not_registered"
00180 
00181         self.assertFalse(topic in manager._publishers)
00182         self.assertFalse(self.is_topic_published(topic))
00183         self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
00184 
00185     def test_publisher_manager_publish(self):
00186         """ Make sure that publishing works """
00187         topic = "/test_publisher_manager_publish"
00188         msg = {"data": "test publisher manager publish"}
00189         client = "client_test_publisher_manager_publish"
00190 
00191         received = {"msg": None}
00192 
00193         def cb(msg):
00194             received["msg"] = msg
00195 
00196         rospy.Subscriber(topic, String, cb)
00197         manager.publish(client, topic, msg)
00198         sleep(0.5)
00199 
00200         self.assertEqual(received["msg"].data, msg["data"])
00201 
00202     def test_publisher_manager_bad_publish(self):
00203         """ Make sure that bad publishing fails """
00204         topic = "/test_publisher_manager_bad_publish"
00205         client = "client_test_publisher_manager_bad_publish"
00206         msg_type = "std_msgs/String"
00207         msg = {"data": 3}
00208 
00209         manager.register(client, topic, msg_type)
00210         self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
00211 
00212 
00213 PKG = 'rosbridge_library'
00214 NAME = 'test_publisher_manager'
00215 if __name__ == '__main__':
00216     rostest.unitrun(PKG, NAME, TestPublisherManager)
00217 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43