test_multi_publisher.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 
00007 from time import sleep, time
00008 
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014 
00015 
00016 class TestMultiPublisher(unittest.TestCase):
00017 
00018     def setUp(self):
00019         rospy.init_node("test_multi_publisher")
00020 
00021     def is_topic_published(self, topicname):
00022         return topicname in dict(rospy.get_published_topics()).keys()
00023 
00024     def test_register_multipublisher(self):
00025         """ Register a publisher on a clean topic with a good msg type """
00026         topic = "/test_register_multipublisher"
00027         msg_type = "std_msgs/String"
00028 
00029         self.assertFalse(self.is_topic_published(topic))
00030         multipublisher = MultiPublisher(topic, msg_type)
00031         self.assertTrue(self.is_topic_published(topic))
00032 
00033     def test_unregister_multipublisher(self):
00034         """ Register and unregister a publisher on a clean topic with a good msg type """
00035         topic = "/test_unregister_multipublisher"
00036         msg_type = "std_msgs/String"
00037 
00038         self.assertFalse(self.is_topic_published(topic))
00039         multipublisher = MultiPublisher(topic, msg_type)
00040         self.assertTrue(self.is_topic_published(topic))
00041         multipublisher.unregister()
00042         self.assertFalse(self.is_topic_published(topic))
00043 
00044     def test_register_client(self):
00045         """ Adds a publisher then removes it.  """
00046         topic = "/test_register_client"
00047         msg_type = "std_msgs/String"
00048         client_id = "client1"
00049 
00050         p = MultiPublisher(topic, msg_type)
00051         self.assertFalse(p.has_clients())
00052 
00053         p.register_client(client_id)
00054         self.assertTrue(p.has_clients())
00055 
00056         p.unregister_client(client_id)
00057         self.assertFalse(p.has_clients())
00058 
00059     def test_register_multiple_clients(self):
00060         """ Adds multiple publishers then removes them.  """
00061         topic = "/test_register_multiple_clients"
00062         msg_type = "std_msgs/String"
00063 
00064         p = MultiPublisher(topic, msg_type)
00065         self.assertFalse(p.has_clients())
00066 
00067         for i in range(1000):
00068             p.register_client("client%d" % i)
00069             self.assertTrue(p.has_clients())
00070 
00071         for i in range(1000):
00072             self.assertTrue(p.has_clients())
00073             p.unregister_client("client%d" % i)
00074 
00075         self.assertFalse(p.has_clients())
00076 
00077     def test_verify_type(self):
00078         topic = "/test_verify_type"
00079         msg_type = "std_msgs/String"
00080         othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00081         "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00082         "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
00083         "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00084         "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00085         "sensor_msgs/PointCloud2"]
00086 
00087         p = MultiPublisher(topic, msg_type)
00088         p.verify_type(msg_type)
00089         for othertype in othertypes:
00090             self.assertRaises(TypeConflictException, p.verify_type, othertype)
00091 
00092     def test_publish(self):
00093         """ Make sure that publishing works """
00094         topic = "/test_publish"
00095         msg_type = "std_msgs/String"
00096         msg = {"data": "why halo thar"}
00097 
00098         received = {"msg": None}
00099         def cb(msg):
00100             received["msg"] = msg
00101 
00102         rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00103         p = MultiPublisher(topic, msg_type)
00104         p.publish(msg)
00105 
00106         sleep(0.5)
00107 
00108         self.assertEqual(received["msg"].data, msg["data"])
00109 
00110     def test_bad_publish(self):
00111         """ Make sure that bad publishing fails """
00112         topic = "/test_publish"
00113         msg_type = "std_msgs/String"
00114         msg = {"data": 3}
00115 
00116         p = MultiPublisher(topic, msg_type)
00117         self.assertRaises(FieldTypeMismatchException, p.publish, msg)
00118 
00119 
00120 PKG = 'rosbridge_library'
00121 NAME = 'test_multi_publisher'
00122 if __name__ == '__main__':
00123     rostest.unitrun(PKG, NAME, TestMultiPublisher)
00124 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43