Go to the documentation of this file.00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006
00007 from time import sleep, time
00008
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014
00015
00016 class TestMultiPublisher(unittest.TestCase):
00017
00018 def setUp(self):
00019 rospy.init_node("test_multi_publisher")
00020
00021 def is_topic_published(self, topicname):
00022 return topicname in dict(rospy.get_published_topics()).keys()
00023
00024 def test_register_multipublisher(self):
00025 """ Register a publisher on a clean topic with a good msg type """
00026 topic = "/test_register_multipublisher"
00027 msg_type = "std_msgs/String"
00028
00029 self.assertFalse(self.is_topic_published(topic))
00030 multipublisher = MultiPublisher(topic, msg_type)
00031 self.assertTrue(self.is_topic_published(topic))
00032
00033 def test_unregister_multipublisher(self):
00034 """ Register and unregister a publisher on a clean topic with a good msg type """
00035 topic = "/test_unregister_multipublisher"
00036 msg_type = "std_msgs/String"
00037
00038 self.assertFalse(self.is_topic_published(topic))
00039 multipublisher = MultiPublisher(topic, msg_type)
00040 self.assertTrue(self.is_topic_published(topic))
00041 multipublisher.unregister()
00042 self.assertFalse(self.is_topic_published(topic))
00043
00044 def test_register_client(self):
00045 """ Adds a publisher then removes it. """
00046 topic = "/test_register_client"
00047 msg_type = "std_msgs/String"
00048 client_id = "client1"
00049
00050 p = MultiPublisher(topic, msg_type)
00051 self.assertFalse(p.has_clients())
00052
00053 p.register_client(client_id)
00054 self.assertTrue(p.has_clients())
00055
00056 p.unregister_client(client_id)
00057 self.assertFalse(p.has_clients())
00058
00059 def test_register_multiple_clients(self):
00060 """ Adds multiple publishers then removes them. """
00061 topic = "/test_register_multiple_clients"
00062 msg_type = "std_msgs/String"
00063
00064 p = MultiPublisher(topic, msg_type)
00065 self.assertFalse(p.has_clients())
00066
00067 for i in range(1000):
00068 p.register_client("client%d" % i)
00069 self.assertTrue(p.has_clients())
00070
00071 for i in range(1000):
00072 self.assertTrue(p.has_clients())
00073 p.unregister_client("client%d" % i)
00074
00075 self.assertFalse(p.has_clients())
00076
00077 def test_verify_type(self):
00078 topic = "/test_verify_type"
00079 msg_type = "std_msgs/String"
00080 othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00081 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00082 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
00083 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00084 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00085 "sensor_msgs/PointCloud2"]
00086
00087 p = MultiPublisher(topic, msg_type)
00088 p.verify_type(msg_type)
00089 for othertype in othertypes:
00090 self.assertRaises(TypeConflictException, p.verify_type, othertype)
00091
00092 def test_publish(self):
00093 """ Make sure that publishing works """
00094 topic = "/test_publish"
00095 msg_type = "std_msgs/String"
00096 msg = {"data": "why halo thar"}
00097
00098 received = {"msg": None}
00099 def cb(msg):
00100 received["msg"] = msg
00101
00102 rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00103 p = MultiPublisher(topic, msg_type)
00104 p.publish(msg)
00105
00106 sleep(0.5)
00107
00108 self.assertEqual(received["msg"].data, msg["data"])
00109
00110 def test_bad_publish(self):
00111 """ Make sure that bad publishing fails """
00112 topic = "/test_publish"
00113 msg_type = "std_msgs/String"
00114 msg = {"data": 3}
00115
00116 p = MultiPublisher(topic, msg_type)
00117 self.assertRaises(FieldTypeMismatchException, p.publish, msg)
00118
00119
00120 PKG = 'rosbridge_library'
00121 NAME = 'test_multi_publisher'
00122 if __name__ == '__main__':
00123 rostest.unitrun(PKG, NAME, TestMultiPublisher)
00124