test_publisher_manager.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG); roslib.load_manifest("std_msgs")
00004 import rospy
00005 
00006 from time import sleep
00007 
00008 from rosbridge_library.internal.publishers import *
00009 from rosbridge_library.internal.topics import *
00010 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00011 from std_msgs.msg import String, Int32
00012 
00013 import unittest
00014 
00015 
00016 class TestPublisherManager(unittest.TestCase):
00017 
00018     def setUp(self):
00019         rospy.init_node("test_publisher_manager")
00020 
00021     def is_topic_published(self, topicname):
00022         return topicname in dict(rospy.get_published_topics()).keys()
00023 
00024     def test_register_publisher(self):
00025         """ Register a publisher on a clean topic with a good msg type """
00026         topic = "/test_register_publisher"
00027         msg_type = "std_msgs/String"
00028         client = "client_test_register_publisher"
00029 
00030         self.assertFalse(topic in manager._publishers)
00031         self.assertFalse(self.is_topic_published(topic))
00032         manager.register(client, topic, msg_type)
00033         self.assertTrue(topic in manager._publishers)
00034         self.assertTrue(self.is_topic_published(topic))
00035 
00036         manager.unregister(client, topic)
00037         self.assertFalse(topic in manager._publishers)
00038         self.assertFalse(self.is_topic_published(topic))
00039 
00040     def test_register_publisher_multiclient(self):
00041         topic = "/test_register_publisher_multiclient"
00042         msg_type = "std_msgs/String"
00043         client1 = "client_test_register_publisher_1"
00044         client2 = "client_test_register_publisher_2"
00045 
00046         self.assertFalse(topic in manager._publishers)
00047         self.assertFalse(self.is_topic_published(topic))
00048         manager.register(client1, topic, msg_type)
00049         self.assertTrue(topic in manager._publishers)
00050         self.assertTrue(self.is_topic_published(topic))
00051         manager.register(client2, topic, msg_type)
00052         self.assertTrue(topic in manager._publishers)
00053         self.assertTrue(self.is_topic_published(topic))
00054         manager.unregister(client1, topic)
00055         self.assertTrue(topic in manager._publishers)
00056         self.assertTrue(self.is_topic_published(topic))
00057         manager.unregister(client2, topic)
00058         self.assertFalse(topic in manager._publishers)
00059         self.assertFalse(self.is_topic_published(topic))
00060 
00061     def test_register_publisher_conflicting_types(self):
00062         topic = "/test_register_publisher_conflicting_types"
00063         msg_type = "std_msgs/String"
00064         msg_type_bad = "std_msgs/Int32"
00065         client = "client_test_register_publisher_conflicting_types"
00066 
00067         self.assertFalse(topic in manager._publishers)
00068         self.assertFalse(self.is_topic_published(topic))
00069         manager.register(client, topic, msg_type)
00070         self.assertTrue(topic in manager._publishers)
00071         self.assertTrue(self.is_topic_published(topic))
00072 
00073         self.assertRaises(TypeConflictException, manager.register, "client2", topic, msg_type_bad)
00074 
00075     def test_register_multiple_publishers(self):
00076         topic1 = "/test_register_multiple_publishers1"
00077         topic2 = "/test_register_multiple_publishers2"
00078         msg_type = "std_msgs/String"
00079         client = "client_test_register_multiple_publishers"
00080 
00081         self.assertFalse(topic1 in manager._publishers)
00082         self.assertFalse(topic2 in manager._publishers)
00083         self.assertFalse(self.is_topic_published(topic1))
00084         self.assertFalse(self.is_topic_published(topic2))
00085         manager.register(client, topic1, msg_type)
00086         self.assertTrue(topic1 in manager._publishers)
00087         self.assertTrue(self.is_topic_published(topic1))
00088         self.assertFalse(topic2 in manager._publishers)
00089         self.assertFalse(self.is_topic_published(topic2))
00090         manager.register(client, topic2, msg_type)
00091         self.assertTrue(topic1 in manager._publishers)
00092         self.assertTrue(self.is_topic_published(topic1))
00093         self.assertTrue(topic2 in manager._publishers)
00094         self.assertTrue(self.is_topic_published(topic2))
00095 
00096         manager.unregister(client, topic1)
00097         self.assertFalse(topic1 in manager._publishers)
00098         self.assertFalse(self.is_topic_published(topic1))
00099         self.assertTrue(topic2 in manager._publishers)
00100         self.assertTrue(self.is_topic_published(topic2))
00101 
00102         manager.unregister(client, topic2)
00103         self.assertFalse(topic1 in manager._publishers)
00104         self.assertFalse(self.is_topic_published(topic1))
00105         self.assertFalse(topic2 in manager._publishers)
00106         self.assertFalse(self.is_topic_published(topic2))
00107 
00108     def test_register_no_msgtype(self):
00109         topic = "/test_register_no_msgtype"
00110         client = "client_test_register_no_msgtype"
00111 
00112         self.assertFalse(topic in manager._publishers)
00113         self.assertFalse(self.is_topic_published(topic))
00114         self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
00115 
00116     def test_register_infer_topictype(self):
00117         topic = "/test_register_infer_topictype"
00118         client = "client_test_register_infer_topictype"
00119 
00120         self.assertFalse(self.is_topic_published(topic))
00121 
00122         rospy.Publisher(topic, String)
00123 
00124         self.assertTrue(self.is_topic_published(topic))
00125         self.assertFalse(topic in manager._publishers)
00126         manager.register(client, topic)
00127         self.assertTrue(topic in manager._publishers)
00128         self.assertTrue(self.is_topic_published(topic))
00129 
00130         manager.unregister(client, topic)
00131         self.assertFalse(topic in manager._publishers)
00132         self.assertTrue(self.is_topic_published(topic))
00133 
00134     def test_register_multiple_notopictype(self):
00135         topic = "/test_register_multiple_notopictype"
00136         msg_type = "std_msgs/String"
00137         client1 = "client_test_register_multiple_notopictype_1"
00138         client2 = "client_test_register_multiple_notopictype_2"
00139 
00140         self.assertFalse(topic in manager._publishers)
00141         self.assertFalse(self.is_topic_published(topic))
00142         manager.register(client1, topic, msg_type)
00143         self.assertTrue(topic in manager._publishers)
00144         self.assertTrue(self.is_topic_published(topic))
00145         manager.register(client2, topic)
00146         self.assertTrue(topic in manager._publishers)
00147         self.assertTrue(self.is_topic_published(topic))
00148         manager.unregister(client1, topic)
00149         self.assertTrue(topic in manager._publishers)
00150         self.assertTrue(self.is_topic_published(topic))
00151         manager.unregister(client2, topic)
00152         self.assertFalse(topic in manager._publishers)
00153         self.assertFalse(self.is_topic_published(topic))
00154 
00155     def test_publish_not_registered(self):
00156         topic = "/test_publish_not_registered"
00157         msg = {"data": "test publish not registered"}
00158         client = "client_test_publish_not_registered"
00159 
00160         self.assertFalse(topic in manager._publishers)
00161         self.assertFalse(self.is_topic_published(topic))
00162         self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
00163 
00164     def test_publisher_manager_publish(self):
00165         """ Make sure that publishing works """
00166         topic = "/test_publisher_manager_publish"
00167         msg = {"data": "test publisher manager publish"}
00168         client = "client_test_publisher_manager_publish"
00169 
00170         received = {"msg": None}
00171 
00172         def cb(msg):
00173             received["msg"] = msg
00174 
00175         rospy.Subscriber(topic, String, cb)
00176         manager.publish(client, topic, msg)
00177         sleep(0.5)
00178 
00179         self.assertEqual(received["msg"].data, msg["data"])
00180 
00181     def test_publisher_manager_bad_publish(self):
00182         """ Make sure that bad publishing fails """
00183         topic = "/test_publisher_manager_bad_publish"
00184         client = "client_test_publisher_manager_bad_publish"
00185         msg_type = "std_msgs/String"
00186         msg = {"data": 3}
00187 
00188         manager.register(client, topic, msg_type)
00189         self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
00190 
00191 
00192 if __name__ == '__main__':
00193     import rostest
00194     rostest.rosrun(PKG, 'test_publisher_manager', TestPublisherManager)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04