00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006
00007 from time import sleep
00008
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
00012 from std_msgs.msg import String, Int32
00013
00014
00015 class TestPublisherManager(unittest.TestCase):
00016
00017 def setUp(self):
00018 rospy.init_node("test_publisher_manager")
00019 UNREGISTER_TIMEOUT = 1.0
00020
00021 def is_topic_published(self, topicname):
00022 return topicname in dict(rospy.get_published_topics()).keys()
00023
00024 def test_register_publisher(self):
00025 """ Register a publisher on a clean topic with a good msg type """
00026 topic = "/test_register_publisher"
00027 msg_type = "std_msgs/String"
00028 client = "client_test_register_publisher"
00029
00030 self.assertFalse(topic in manager._publishers)
00031 self.assertFalse(self.is_topic_published(topic))
00032 manager.register(client, topic, msg_type)
00033 self.assertTrue(topic in manager._publishers)
00034 self.assertTrue(self.is_topic_published(topic))
00035
00036 manager.unregister(client, topic)
00037 self.assertTrue(topic in manager.unregister_timers)
00038 self.assertTrue(topic in manager._publishers)
00039 self.assertTrue(self.is_topic_published(topic))
00040 sleep(UNREGISTER_TIMEOUT*1.1)
00041 self.assertFalse(topic in manager._publishers)
00042 self.assertFalse(self.is_topic_published(topic))
00043 self.assertFalse(topic in manager.unregister_timers)
00044
00045 def test_register_publisher_multiclient(self):
00046 topic = "/test_register_publisher_multiclient"
00047 msg_type = "std_msgs/String"
00048 client1 = "client_test_register_publisher_1"
00049 client2 = "client_test_register_publisher_2"
00050
00051 self.assertFalse(topic in manager._publishers)
00052 self.assertFalse(self.is_topic_published(topic))
00053 manager.register(client1, topic, msg_type)
00054 self.assertTrue(topic in manager._publishers)
00055 self.assertTrue(self.is_topic_published(topic))
00056 manager.register(client2, topic, msg_type)
00057 self.assertTrue(topic in manager._publishers)
00058 self.assertTrue(self.is_topic_published(topic))
00059 manager.unregister(client1, topic)
00060 self.assertTrue(topic in manager._publishers)
00061 self.assertTrue(self.is_topic_published(topic))
00062 manager.unregister(client2, topic)
00063 self.assertTrue(topic in manager.unregister_timers)
00064 self.assertTrue(topic in manager._publishers)
00065 self.assertTrue(self.is_topic_published(topic))
00066 sleep(UNREGISTER_TIMEOUT*1.1)
00067 self.assertFalse(topic in manager._publishers)
00068 self.assertFalse(self.is_topic_published(topic))
00069 self.assertFalse(topic in manager.unregister_timers)
00070
00071 def test_register_publisher_conflicting_types(self):
00072 topic = "/test_register_publisher_conflicting_types"
00073 msg_type = "std_msgs/String"
00074 msg_type_bad = "std_msgs/Int32"
00075 client = "client_test_register_publisher_conflicting_types"
00076
00077 self.assertFalse(topic in manager._publishers)
00078 self.assertFalse(self.is_topic_published(topic))
00079 manager.register(client, topic, msg_type)
00080 self.assertTrue(topic in manager._publishers)
00081 self.assertTrue(self.is_topic_published(topic))
00082
00083 self.assertRaises(TypeConflictException, manager.register, "client2", topic, msg_type_bad)
00084
00085 def test_register_multiple_publishers(self):
00086 topic1 = "/test_register_multiple_publishers1"
00087 topic2 = "/test_register_multiple_publishers2"
00088 msg_type = "std_msgs/String"
00089 client = "client_test_register_multiple_publishers"
00090
00091 self.assertFalse(topic1 in manager._publishers)
00092 self.assertFalse(topic2 in manager._publishers)
00093 self.assertFalse(self.is_topic_published(topic1))
00094 self.assertFalse(self.is_topic_published(topic2))
00095 manager.register(client, topic1, msg_type)
00096 self.assertTrue(topic1 in manager._publishers)
00097 self.assertTrue(self.is_topic_published(topic1))
00098 self.assertFalse(topic2 in manager._publishers)
00099 self.assertFalse(self.is_topic_published(topic2))
00100 manager.register(client, topic2, msg_type)
00101 self.assertTrue(topic1 in manager._publishers)
00102 self.assertTrue(self.is_topic_published(topic1))
00103 self.assertTrue(topic2 in manager._publishers)
00104 self.assertTrue(self.is_topic_published(topic2))
00105
00106 manager.unregister(client, topic1)
00107 self.assertTrue(self.is_topic_published(topic1))
00108 self.assertTrue(topic1 in manager.unregister_timers)
00109 self.assertTrue(topic2 in manager._publishers)
00110 self.assertTrue(self.is_topic_published(topic2))
00111
00112 manager.unregister(client, topic2)
00113 self.assertTrue(topic2 in manager.unregister_timers)
00114 self.assertTrue(self.is_topic_published(topic2))
00115 sleep(UNREGISTER_TIMEOUT*1.1)
00116 self.assertFalse(topic1 in manager._publishers)
00117 self.assertFalse(self.is_topic_published(topic1))
00118 self.assertFalse(topic2 in manager._publishers)
00119 self.assertFalse(self.is_topic_published(topic2))
00120 self.assertFalse(topic1 in manager.unregister_timers)
00121 self.assertFalse(topic2 in manager.unregister_timers)
00122
00123 def test_register_no_msgtype(self):
00124 topic = "/test_register_no_msgtype"
00125 client = "client_test_register_no_msgtype"
00126
00127 self.assertFalse(topic in manager._publishers)
00128 self.assertFalse(self.is_topic_published(topic))
00129 self.assertRaises(TopicNotEstablishedException, manager.register, client, topic)
00130
00131 def test_register_infer_topictype(self):
00132 topic = "/test_register_infer_topictype"
00133 client = "client_test_register_infer_topictype"
00134
00135 self.assertFalse(self.is_topic_published(topic))
00136
00137 rospy.Publisher(topic, String)
00138
00139 self.assertTrue(self.is_topic_published(topic))
00140 self.assertFalse(topic in manager._publishers)
00141 manager.register(client, topic)
00142 self.assertTrue(topic in manager._publishers)
00143 self.assertTrue(self.is_topic_published(topic))
00144
00145 manager.unregister(client, topic)
00146 self.assertTrue(topic in manager.unregister_timers)
00147 self.assertTrue(self.is_topic_published(topic))
00148
00149 def test_register_multiple_notopictype(self):
00150 topic = "/test_register_multiple_notopictype"
00151 msg_type = "std_msgs/String"
00152 client1 = "client_test_register_multiple_notopictype_1"
00153 client2 = "client_test_register_multiple_notopictype_2"
00154
00155 self.assertFalse(topic in manager._publishers)
00156 self.assertFalse(topic in manager.unregister_timers)
00157 self.assertFalse(self.is_topic_published(topic))
00158 manager.register(client1, topic, msg_type)
00159 self.assertTrue(topic in manager._publishers)
00160 self.assertTrue(self.is_topic_published(topic))
00161 manager.register(client2, topic)
00162 self.assertTrue(topic in manager._publishers)
00163 self.assertTrue(self.is_topic_published(topic))
00164 manager.unregister(client1, topic)
00165 self.assertTrue(topic in manager._publishers)
00166 self.assertTrue(topic in manager.unregister_timers)
00167 self.assertTrue(self.is_topic_published(topic))
00168 manager.unregister(client2, topic)
00169 self.assertTrue(topic in manager.unregister_timers)
00170 self.assertTrue(topic in manager._publishers)
00171 sleep(UNREGISTER_TIMEOUT*1.1)
00172 self.assertFalse(topic in manager._publishers)
00173 self.assertFalse(topic in manager.unregister_timers)
00174 self.assertFalse(self.is_topic_published(topic))
00175
00176 def test_publish_not_registered(self):
00177 topic = "/test_publish_not_registered"
00178 msg = {"data": "test publish not registered"}
00179 client = "client_test_publish_not_registered"
00180
00181 self.assertFalse(topic in manager._publishers)
00182 self.assertFalse(self.is_topic_published(topic))
00183 self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg)
00184
00185 def test_publisher_manager_publish(self):
00186 """ Make sure that publishing works """
00187 topic = "/test_publisher_manager_publish"
00188 msg = {"data": "test publisher manager publish"}
00189 client = "client_test_publisher_manager_publish"
00190
00191 received = {"msg": None}
00192
00193 def cb(msg):
00194 received["msg"] = msg
00195
00196 rospy.Subscriber(topic, String, cb)
00197 manager.publish(client, topic, msg)
00198 sleep(0.5)
00199
00200 self.assertEqual(received["msg"].data, msg["data"])
00201
00202 def test_publisher_manager_bad_publish(self):
00203 """ Make sure that bad publishing fails """
00204 topic = "/test_publisher_manager_bad_publish"
00205 client = "client_test_publisher_manager_bad_publish"
00206 msg_type = "std_msgs/String"
00207 msg = {"data": 3}
00208
00209 manager.register(client, topic, msg_type)
00210 self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg)
00211
00212
00213 PKG = 'rosbridge_library'
00214 NAME = 'test_publisher_manager'
00215 if __name__ == '__main__':
00216 rostest.unitrun(PKG, NAME, TestPublisherManager)
00217