Go to the documentation of this file.00001
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG); roslib.load_manifest("std_msgs")
00004 import rospy
00005 from rospy import get_published_topics
00006
00007 from time import sleep, time
00008
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014
00015 import unittest
00016
00017 class TestMultiPublisher(unittest.TestCase):
00018
00019 def setUp(self):
00020 rospy.init_node("test_multi_publisher")
00021
00022 def is_topic_published(self, topicname):
00023 return topicname in dict(get_published_topics()).keys()
00024
00025 def test_register_multipublisher(self):
00026 """ Register a publisher on a clean topic with a good msg type """
00027 topic = "/test_register_multipublisher"
00028 msg_type = "std_msgs/String"
00029
00030 self.assertFalse(self.is_topic_published(topic))
00031 multipublisher = MultiPublisher(topic, msg_type)
00032 self.assertTrue(self.is_topic_published(topic))
00033
00034 def test_unregister_multipublisher(self):
00035 """ Register and unregister a publisher on a clean topic with a good msg type """
00036 topic = "/test_unregister_multipublisher"
00037 msg_type = "std_msgs/String"
00038
00039 self.assertFalse(self.is_topic_published(topic))
00040 multipublisher = MultiPublisher(topic, msg_type)
00041 self.assertTrue(self.is_topic_published(topic))
00042 multipublisher.unregister()
00043 self.assertFalse(self.is_topic_published(topic))
00044
00045 def test_register_client(self):
00046 """ Adds a publisher then removes it. """
00047 topic = "/test_register_client"
00048 msg_type = "std_msgs/String"
00049 client_id = "client1"
00050
00051 p = MultiPublisher(topic, msg_type)
00052 self.assertFalse(p.has_clients())
00053
00054 p.register_client(client_id)
00055 self.assertTrue(p.has_clients())
00056
00057 p.unregister_client(client_id)
00058 self.assertFalse(p.has_clients())
00059
00060 def test_register_multiple_clients(self):
00061 """ Adds multiple publishers then removes them. """
00062 topic = "/test_register_multiple_clients"
00063 msg_type = "std_msgs/String"
00064
00065 p = MultiPublisher(topic, msg_type)
00066 self.assertFalse(p.has_clients())
00067
00068 for i in range(1000):
00069 p.register_client("client%d" % i)
00070 self.assertTrue(p.has_clients())
00071
00072 for i in range(1000):
00073 self.assertTrue(p.has_clients())
00074 p.unregister_client("client%d" % i)
00075
00076 self.assertFalse(p.has_clients())
00077
00078 def test_verify_type(self):
00079 topic = "/test_verify_type"
00080 msg_type = "std_msgs/String"
00081 othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00082 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00083 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
00084 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00085 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00086 "sensor_msgs/PointCloud2"]
00087
00088 p = MultiPublisher(topic, msg_type)
00089 p.verify_type(msg_type)
00090 for othertype in othertypes:
00091 self.assertRaises(TypeConflictException, p.verify_type, othertype)
00092
00093 def test_publish(self):
00094 """ Make sure that publishing works """
00095 topic = "/test_publish"
00096 msg_type = "std_msgs/String"
00097 msg = {"data": "why halo thar"}
00098
00099 received = {"msg": None}
00100 def cb(msg):
00101 received["msg"] = msg
00102
00103 rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00104 p = MultiPublisher(topic, msg_type)
00105 p.publish(msg)
00106
00107 sleep(0.5)
00108
00109 self.assertEqual(received["msg"].data, msg["data"])
00110
00111 def test_bad_publish(self):
00112 """ Make sure that bad publishing fails """
00113 topic = "/test_publish"
00114 msg_type = "std_msgs/String"
00115 msg = {"data": 3}
00116
00117 p = MultiPublisher(topic, msg_type)
00118 self.assertRaises(FieldTypeMismatchException, p.publish, msg)
00119
00120 if __name__ == '__main__':
00121 import rostest
00122 rostest.rosrun(PKG, 'test_multi_publisher', TestMultiPublisher)