test_multi_publisher.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG); roslib.load_manifest("std_msgs")
00004 import rospy
00005 from rospy import get_published_topics
00006 
00007 from time import sleep, time
00008 
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014 
00015 import unittest
00016 
00017 class TestMultiPublisher(unittest.TestCase):
00018 
00019     def setUp(self):
00020         rospy.init_node("test_multi_publisher")
00021 
00022     def is_topic_published(self, topicname):
00023         return topicname in dict(get_published_topics()).keys()
00024 
00025     def test_register_multipublisher(self):
00026         """ Register a publisher on a clean topic with a good msg type """
00027         topic = "/test_register_multipublisher"
00028         msg_type = "std_msgs/String"
00029 
00030         self.assertFalse(self.is_topic_published(topic))
00031         multipublisher = MultiPublisher(topic, msg_type)
00032         self.assertTrue(self.is_topic_published(topic))
00033 
00034     def test_unregister_multipublisher(self):
00035         """ Register and unregister a publisher on a clean topic with a good msg type """
00036         topic = "/test_unregister_multipublisher"
00037         msg_type = "std_msgs/String"
00038 
00039         self.assertFalse(self.is_topic_published(topic))
00040         multipublisher = MultiPublisher(topic, msg_type)
00041         self.assertTrue(self.is_topic_published(topic))
00042         multipublisher.unregister()
00043         self.assertFalse(self.is_topic_published(topic))
00044 
00045     def test_register_client(self):
00046         """ Adds a publisher then removes it.  """
00047         topic = "/test_register_client"
00048         msg_type = "std_msgs/String"
00049         client_id = "client1"
00050 
00051         p = MultiPublisher(topic, msg_type)
00052         self.assertFalse(p.has_clients())
00053 
00054         p.register_client(client_id)
00055         self.assertTrue(p.has_clients())
00056 
00057         p.unregister_client(client_id)
00058         self.assertFalse(p.has_clients())
00059 
00060     def test_register_multiple_clients(self):
00061         """ Adds multiple publishers then removes them.  """
00062         topic = "/test_register_multiple_clients"
00063         msg_type = "std_msgs/String"
00064 
00065         p = MultiPublisher(topic, msg_type)
00066         self.assertFalse(p.has_clients())
00067 
00068         for i in range(1000):
00069             p.register_client("client%d" % i)
00070             self.assertTrue(p.has_clients())
00071 
00072         for i in range(1000):
00073             self.assertTrue(p.has_clients())
00074             p.unregister_client("client%d" % i)
00075 
00076         self.assertFalse(p.has_clients())
00077 
00078     def test_verify_type(self):
00079         topic = "/test_verify_type"
00080         msg_type = "std_msgs/String"
00081         othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00082         "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00083         "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
00084         "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00085         "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00086         "sensor_msgs/PointCloud2"]
00087 
00088         p = MultiPublisher(topic, msg_type)
00089         p.verify_type(msg_type)
00090         for othertype in othertypes:
00091             self.assertRaises(TypeConflictException, p.verify_type, othertype)
00092 
00093     def test_publish(self):
00094         """ Make sure that publishing works """
00095         topic = "/test_publish"
00096         msg_type = "std_msgs/String"
00097         msg = {"data": "why halo thar"}
00098 
00099         received = {"msg": None}
00100         def cb(msg):
00101             received["msg"] = msg
00102 
00103         rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00104         p = MultiPublisher(topic, msg_type)
00105         p.publish(msg)
00106 
00107         sleep(0.5)
00108 
00109         self.assertEqual(received["msg"].data, msg["data"])
00110 
00111     def test_bad_publish(self):
00112         """ Make sure that bad publishing fails """
00113         topic = "/test_publish"
00114         msg_type = "std_msgs/String"
00115         msg = {"data": 3}
00116 
00117         p = MultiPublisher(topic, msg_type)
00118         self.assertRaises(FieldTypeMismatchException, p.publish, msg)
00119 
00120 if __name__ == '__main__':
00121     import rostest
00122     rostest.rosrun(PKG, 'test_multi_publisher', TestMultiPublisher)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04