00001
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG); roslib.load_manifest("std_msgs")
00004 import rospy
00005
00006 from time import sleep
00007
00008 from rosbridge_library.internal.publishers import *
00009 from rosbridge_library.internal.topics import *
00010 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00011 from std_msgs.msg import String, Int32
00012
00013 import unittest
00014
00015
00016 class TestPublisherManager(unittest.TestCase):
00017
00018 def setUp(self):
00019 rospy.init_node("test_publisher_manager")
00020
00021 def is_topic_published(self, topicname):
00022 return topicname in dict(rospy.get_published_topics()).keys()
00023
00024 def test_register_publisher(self):
00025 """ Register a publisher on a clean topic with a good msg type """
00026 topic = "/test_register_publisher"
00027 msg_type = "std_msgs/String"
00028 client = "client_test_register_publisher"
00029
00030 self.assertFalse(topic in manager._publishers)
00031 self.assertFalse(self.is_topic_published(topic))
00032 manager.register(client, topic, msg_type)
00033 self.assertTrue(topic in manager._publishers)
00034 self.assertTrue(self.is_topic_published(topic))
00035
00036 manager.unregister(client, topic)
00037 self.assertFalse(topic in manager._publishers)
00038 self.assertFalse(self.is_topic_published(topic))
00039
00040 def test_register_publisher_multiclient(self):
00041 topic = "/test_register_publisher_multiclient"
00042 msg_type = "std_msgs/String"
00043 client1 = "client_test_register_publisher_1"
00044 client2 = "client_test_register_publisher_2"
00045
00046 self.assertFalse(topic in manager._publishers)
00047 self.assertFalse(self.is_topic_published(topic))
00048 manager.register(client1, topic, msg_type)
00049 self.assertTrue(topic in manager._publishers)
00050 self.assertTrue(self.is_topic_published(topic))
00051 manager.register(client2, topic, msg_type)
00052 self.assertTrue(topic in manager._publishers)
00053 self.assertTrue(self.is_topic_published(topic))
00054 manager.unregister(client1, topic)
00055 self.assertTrue(topic in manager._publishers)
00056 self.assertTrue(self.is_topic_published(topic))
00057 manager.unregister(client2, topic)
00058 self.assertFalse(topic in manager._publishers)
00059 self.assertFalse(self.is_topic_published(topic))
00060
00061 def test_register_publisher_conflicting_types(self):
00062 topic = "/test_register_publisher_conflicting_types"
00063 msg_type = "std_msgs/String"
00064 msg_type_bad = "std_msgs/Int32"
00065 client = "client_test_register_publisher_conflicting_types"
00066
00067 self.assertFalse(topic in manager._publishers)
00068 self.assertFalse(self.is_topic_published(topic))
00069 manager.register(client, topic, msg_type)
00070 self.assertTrue(topic in manager._publishers)
00071 self.assertTrue(self.is_topic_published(topic))
00072
00073 self.assertRaises(TypeConflictException, manager.register, "client2", topic, msg_type_bad)
00074
00075 def test_register_multiple_publishers(self):
00076 topic1 = "/test_register_multiple_publishers1"
00077 topic2 = "/test_register_multiple_publishers2"
00078 msg_type = "std_msgs/String"
00079 client = "client_test_register_multiple_publishers"
00080
00081 self.assertFalse(topic1 in manager._publishers)
00082 self.assertFalse(topic2 in manager._publishers)
00083 self.assertFalse(self.is_topic_published(topic1))
00084 self.assertFalse(self.is_topic_published(topic2))
00085 manager.register(client, topic1, msg_type)
00086 self.assertTrue(topic1 in manager._publishers)
00087 self.assertTrue(self.is_topic_published(topic1))
00088 self.assertFalse(topic2 in manager._publishers)
00089 self.assertFalse(self.is_topic_published(topic2))
00090 manager.register(client, topic2, msg_type)
00091 self.assertTrue(topic1 in manager._publishers)
00092 self.assertTrue(self.is_topic_published(topic1))
00093 self.assertTrue(topic2 in manager._publishers)
00094 self.assertTrue(self.is_topic_published(topic2))
00095
00096 manager.unregister(client, topic1)
00097 self.assertFalse(topic1 in manager._publishers)
00098 self.assertFalse(self.is_topic_published(topic1))
00099 self.assertTrue(topic2 in manager._publishers)
00100 self.assertTrue(self.is_topic_published(topic2))
00101
00102 manager.unregister(client, topic2)
00103 self.assertFalse(topic1 in manager._publishers)
00104 self.assertFalse(self.is_topic_published(topic1))
00105 self.assertFalse(topic2 in manager._publishers)
00106 self.assertFalse(self.is_topic_published(topic2))
00107
00108 def test_register_no_msgtype(self):
00109 topic = "/test_register_no_msgtype"
00110 client = "client_test_register_no_msgtype"
00111
00112 self.assertFalse(topic in manager._publishers)
00113 self.assertFalse(self.is_topic_published(topic))
00114 self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
00115
00116 def test_register_infer_topictype(self):
00117 topic = "/test_register_infer_topictype"
00118 client = "client_test_register_infer_topictype"
00119
00120 self.assertFalse(self.is_topic_published(topic))
00121
00122 rospy.Publisher(topic, String)
00123
00124 self.assertTrue(self.is_topic_published(topic))
00125 self.assertFalse(topic in manager._publishers)
00126 manager.register(client, topic)
00127 self.assertTrue(topic in manager._publishers)
00128 self.assertTrue(self.is_topic_published(topic))
00129
00130 manager.unregister(client, topic)
00131 self.assertFalse(topic in manager._publishers)
00132 self.assertTrue(self.is_topic_published(topic))
00133
00134 def test_register_multiple_notopictype(self):
00135 topic = "/test_register_multiple_notopictype"
00136 msg_type = "std_msgs/String"
00137 client1 = "client_test_register_multiple_notopictype_1"
00138 client2 = "client_test_register_multiple_notopictype_2"
00139
00140 self.assertFalse(topic in manager._publishers)
00141 self.assertFalse(self.is_topic_published(topic))
00142 manager.register(client1, topic, msg_type)
00143 self.assertTrue(topic in manager._publishers)
00144 self.assertTrue(self.is_topic_published(topic))
00145 manager.register(client2, topic)
00146 self.assertTrue(topic in manager._publishers)
00147 self.assertTrue(self.is_topic_published(topic))
00148 manager.unregister(client1, topic)
00149 self.assertTrue(topic in manager._publishers)
00150 self.assertTrue(self.is_topic_published(topic))
00151 manager.unregister(client2, topic)
00152 self.assertFalse(topic in manager._publishers)
00153 self.assertFalse(self.is_topic_published(topic))
00154
00155 def test_publish_not_registered(self):
00156 topic = "/test_publish_not_registered"
00157 msg = {"data": "test publish not registered"}
00158 client = "client_test_publish_not_registered"
00159
00160 self.assertFalse(topic in manager._publishers)
00161 self.assertFalse(self.is_topic_published(topic))
00162 self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
00163
00164 def test_publisher_manager_publish(self):
00165 """ Make sure that publishing works """
00166 topic = "/test_publisher_manager_publish"
00167 msg = {"data": "test publisher manager publish"}
00168 client = "client_test_publisher_manager_publish"
00169
00170 received = {"msg": None}
00171
00172 def cb(msg):
00173 received["msg"] = msg
00174
00175 rospy.Subscriber(topic, String, cb)
00176 manager.publish(client, topic, msg)
00177 sleep(0.5)
00178
00179 self.assertEqual(received["msg"].data, msg["data"])
00180
00181 def test_publisher_manager_bad_publish(self):
00182 """ Make sure that bad publishing fails """
00183 topic = "/test_publisher_manager_bad_publish"
00184 client = "client_test_publisher_manager_bad_publish"
00185 msg_type = "std_msgs/String"
00186 msg = {"data": 3}
00187
00188 manager.register(client, topic, msg_type)
00189 self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
00190
00191
00192 if __name__ == '__main__':
00193 import rostest
00194 rostest.rosrun(PKG, 'test_publisher_manager', TestPublisherManager)