00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006
00007 from time import sleep
00008
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00012 from std_msgs.msg import String, Int32
00013
00014
00015 class TestPublisherManager(unittest.TestCase):
00016
00017 def setUp(self):
00018 rospy.init_node("test_publisher_manager")
00019
00020 def is_topic_published(self, topicname):
00021 return topicname in dict(rospy.get_published_topics()).keys()
00022
00023 def test_register_publisher(self):
00024 """ Register a publisher on a clean topic with a good msg type """
00025 topic = "/test_register_publisher"
00026 msg_type = "std_msgs/String"
00027 client = "client_test_register_publisher"
00028
00029 self.assertFalse(topic in manager._publishers)
00030 self.assertFalse(self.is_topic_published(topic))
00031 manager.register(client, topic, msg_type)
00032 self.assertTrue(topic in manager._publishers)
00033 self.assertTrue(self.is_topic_published(topic))
00034
00035 manager.unregister(client, topic)
00036 self.assertFalse(topic in manager._publishers)
00037 self.assertFalse(self.is_topic_published(topic))
00038
00039 def test_register_publisher_multiclient(self):
00040 topic = "/test_register_publisher_multiclient"
00041 msg_type = "std_msgs/String"
00042 client1 = "client_test_register_publisher_1"
00043 client2 = "client_test_register_publisher_2"
00044
00045 self.assertFalse(topic in manager._publishers)
00046 self.assertFalse(self.is_topic_published(topic))
00047 manager.register(client1, topic, msg_type)
00048 self.assertTrue(topic in manager._publishers)
00049 self.assertTrue(self.is_topic_published(topic))
00050 manager.register(client2, topic, msg_type)
00051 self.assertTrue(topic in manager._publishers)
00052 self.assertTrue(self.is_topic_published(topic))
00053 manager.unregister(client1, topic)
00054 self.assertTrue(topic in manager._publishers)
00055 self.assertTrue(self.is_topic_published(topic))
00056 manager.unregister(client2, topic)
00057 self.assertFalse(topic in manager._publishers)
00058 self.assertFalse(self.is_topic_published(topic))
00059
00060 def test_register_publisher_conflicting_types(self):
00061 topic = "/test_register_publisher_conflicting_types"
00062 msg_type = "std_msgs/String"
00063 msg_type_bad = "std_msgs/Int32"
00064 client = "client_test_register_publisher_conflicting_types"
00065
00066 self.assertFalse(topic in manager._publishers)
00067 self.assertFalse(self.is_topic_published(topic))
00068 manager.register(client, topic, msg_type)
00069 self.assertTrue(topic in manager._publishers)
00070 self.assertTrue(self.is_topic_published(topic))
00071
00072 self.assertRaises(TypeConflictException, manager.register, "client2", topic, msg_type_bad)
00073
00074 def test_register_multiple_publishers(self):
00075 topic1 = "/test_register_multiple_publishers1"
00076 topic2 = "/test_register_multiple_publishers2"
00077 msg_type = "std_msgs/String"
00078 client = "client_test_register_multiple_publishers"
00079
00080 self.assertFalse(topic1 in manager._publishers)
00081 self.assertFalse(topic2 in manager._publishers)
00082 self.assertFalse(self.is_topic_published(topic1))
00083 self.assertFalse(self.is_topic_published(topic2))
00084 manager.register(client, topic1, msg_type)
00085 self.assertTrue(topic1 in manager._publishers)
00086 self.assertTrue(self.is_topic_published(topic1))
00087 self.assertFalse(topic2 in manager._publishers)
00088 self.assertFalse(self.is_topic_published(topic2))
00089 manager.register(client, topic2, msg_type)
00090 self.assertTrue(topic1 in manager._publishers)
00091 self.assertTrue(self.is_topic_published(topic1))
00092 self.assertTrue(topic2 in manager._publishers)
00093 self.assertTrue(self.is_topic_published(topic2))
00094
00095 manager.unregister(client, topic1)
00096 self.assertFalse(topic1 in manager._publishers)
00097 self.assertFalse(self.is_topic_published(topic1))
00098 self.assertTrue(topic2 in manager._publishers)
00099 self.assertTrue(self.is_topic_published(topic2))
00100
00101 manager.unregister(client, topic2)
00102 self.assertFalse(topic1 in manager._publishers)
00103 self.assertFalse(self.is_topic_published(topic1))
00104 self.assertFalse(topic2 in manager._publishers)
00105 self.assertFalse(self.is_topic_published(topic2))
00106
00107 def test_register_no_msgtype(self):
00108 topic = "/test_register_no_msgtype"
00109 client = "client_test_register_no_msgtype"
00110
00111 self.assertFalse(topic in manager._publishers)
00112 self.assertFalse(self.is_topic_published(topic))
00113 self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
00114
00115 def test_register_infer_topictype(self):
00116 topic = "/test_register_infer_topictype"
00117 client = "client_test_register_infer_topictype"
00118
00119 self.assertFalse(self.is_topic_published(topic))
00120
00121 rospy.Publisher(topic, String)
00122
00123 self.assertTrue(self.is_topic_published(topic))
00124 self.assertFalse(topic in manager._publishers)
00125 manager.register(client, topic)
00126 self.assertTrue(topic in manager._publishers)
00127 self.assertTrue(self.is_topic_published(topic))
00128
00129 manager.unregister(client, topic)
00130 self.assertFalse(topic in manager._publishers)
00131 self.assertTrue(self.is_topic_published(topic))
00132
00133 def test_register_multiple_notopictype(self):
00134 topic = "/test_register_multiple_notopictype"
00135 msg_type = "std_msgs/String"
00136 client1 = "client_test_register_multiple_notopictype_1"
00137 client2 = "client_test_register_multiple_notopictype_2"
00138
00139 self.assertFalse(topic in manager._publishers)
00140 self.assertFalse(self.is_topic_published(topic))
00141 manager.register(client1, topic, msg_type)
00142 self.assertTrue(topic in manager._publishers)
00143 self.assertTrue(self.is_topic_published(topic))
00144 manager.register(client2, topic)
00145 self.assertTrue(topic in manager._publishers)
00146 self.assertTrue(self.is_topic_published(topic))
00147 manager.unregister(client1, topic)
00148 self.assertTrue(topic in manager._publishers)
00149 self.assertTrue(self.is_topic_published(topic))
00150 manager.unregister(client2, topic)
00151 self.assertFalse(topic in manager._publishers)
00152 self.assertFalse(self.is_topic_published(topic))
00153
00154 def test_publish_not_registered(self):
00155 topic = "/test_publish_not_registered"
00156 msg = {"data": "test publish not registered"}
00157 client = "client_test_publish_not_registered"
00158
00159 self.assertFalse(topic in manager._publishers)
00160 self.assertFalse(self.is_topic_published(topic))
00161 self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
00162
00163 def test_publisher_manager_publish(self):
00164 """ Make sure that publishing works """
00165 topic = "/test_publisher_manager_publish"
00166 msg = {"data": "test publisher manager publish"}
00167 client = "client_test_publisher_manager_publish"
00168
00169 received = {"msg": None}
00170
00171 def cb(msg):
00172 received["msg"] = msg
00173
00174 rospy.Subscriber(topic, String, cb)
00175 manager.publish(client, topic, msg)
00176 sleep(0.5)
00177
00178 self.assertEqual(received["msg"].data, msg["data"])
00179
00180 def test_publisher_manager_bad_publish(self):
00181 """ Make sure that bad publishing fails """
00182 topic = "/test_publisher_manager_bad_publish"
00183 client = "client_test_publisher_manager_bad_publish"
00184 msg_type = "std_msgs/String"
00185 msg = {"data": 3}
00186
00187 manager.register(client, topic, msg_type)
00188 self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
00189
00190
00191 PKG = 'rosbridge_library'
00192 NAME = 'test_publisher_manager'
00193 if __name__ == '__main__':
00194 rostest.unitrun(PKG, NAME, TestPublisherManager)
00195