test_multi_publisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 
7 from time import sleep, time
8 
11 from rosbridge_library.internal import ros_loader
13 from std_msgs.msg import String, Int32
14 
15 
16 class TestMultiPublisher(unittest.TestCase):
17 
18  def setUp(self):
19  rospy.init_node("test_multi_publisher")
20 
21  def is_topic_published(self, topicname):
22  return topicname in dict(rospy.get_published_topics()).keys()
23 
25  """ Register a publisher on a clean topic with a good msg type """
26  topic = "/test_register_multipublisher"
27  msg_type = "std_msgs/String"
28 
29  self.assertFalse(self.is_topic_published(topic))
30  multipublisher = MultiPublisher(topic, msg_type)
31  self.assertTrue(self.is_topic_published(topic))
32 
34  """ Register and unregister a publisher on a clean topic with a good msg type """
35  topic = "/test_unregister_multipublisher"
36  msg_type = "std_msgs/String"
37 
38  self.assertFalse(self.is_topic_published(topic))
39  multipublisher = MultiPublisher(topic, msg_type)
40  self.assertTrue(self.is_topic_published(topic))
41  multipublisher.unregister()
42  self.assertFalse(self.is_topic_published(topic))
43 
45  """ Adds a publisher then removes it. """
46  topic = "/test_register_client"
47  msg_type = "std_msgs/String"
48  client_id = "client1"
49 
50  p = MultiPublisher(topic, msg_type)
51  self.assertFalse(p.has_clients())
52 
53  p.register_client(client_id)
54  self.assertTrue(p.has_clients())
55 
56  p.unregister_client(client_id)
57  self.assertFalse(p.has_clients())
58 
60  """ Adds multiple publishers then removes them. """
61  topic = "/test_register_multiple_clients"
62  msg_type = "std_msgs/String"
63 
64  p = MultiPublisher(topic, msg_type)
65  self.assertFalse(p.has_clients())
66 
67  for i in range(1000):
68  p.register_client("client%d" % i)
69  self.assertTrue(p.has_clients())
70 
71  for i in range(1000):
72  self.assertTrue(p.has_clients())
73  p.unregister_client("client%d" % i)
74 
75  self.assertFalse(p.has_clients())
76 
77  def test_verify_type(self):
78  topic = "/test_verify_type"
79  msg_type = "std_msgs/String"
80  othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
81  "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
82  "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
83  "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
84  "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
85  "sensor_msgs/PointCloud2"]
86 
87  p = MultiPublisher(topic, msg_type)
88  p.verify_type(msg_type)
89  for othertype in othertypes:
90  self.assertRaises(TypeConflictException, p.verify_type, othertype)
91 
92  def test_publish(self):
93  """ Make sure that publishing works """
94  topic = "/test_publish"
95  msg_type = "std_msgs/String"
96  msg = {"data": "why halo thar"}
97 
98  received = {"msg": None}
99  def cb(msg):
100  received["msg"] = msg
101 
102  rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
103  p = MultiPublisher(topic, msg_type)
104  p.publish(msg)
105 
106  sleep(0.5)
107 
108  self.assertEqual(received["msg"].data, msg["data"])
109 
110  def test_bad_publish(self):
111  """ Make sure that bad publishing fails """
112  topic = "/test_publish"
113  msg_type = "std_msgs/String"
114  msg = {"data": 3}
115 
116  p = MultiPublisher(topic, msg_type)
117  self.assertRaises(FieldTypeMismatchException, p.publish, msg)
118 
119 
120 PKG = 'rosbridge_library'
121 NAME = 'test_multi_publisher'
122 if __name__ == '__main__':
123  rostest.unitrun(PKG, NAME, TestMultiPublisher)
124 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri May 10 2019 02:17:02