test_multi_publisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 
7 from time import sleep, time
8 
11 from rosbridge_library.internal import ros_loader
13 from std_msgs.msg import String, Int32
14 
15 
16 PKG = 'rosbridge_library'
17 NAME = 'test_multi_publisher'
18 
19 
20 class TestMultiPublisher(unittest.TestCase):
21 
22  def setUp(self):
23  rospy.init_node(NAME)
24 
25  def is_topic_published(self, topicname):
26  return topicname in dict(rospy.get_published_topics()).keys()
27 
29  """ Register a publisher on a clean topic with a good msg type """
30  topic = "/test_register_multipublisher"
31  msg_type = "std_msgs/String"
32 
33  self.assertFalse(self.is_topic_published(topic))
34  multipublisher = MultiPublisher(topic, msg_type)
35  self.assertTrue(self.is_topic_published(topic))
36 
38  """ Register and unregister a publisher on a clean topic with a good msg type """
39  topic = "/test_unregister_multipublisher"
40  msg_type = "std_msgs/String"
41 
42  self.assertFalse(self.is_topic_published(topic))
43  multipublisher = MultiPublisher(topic, msg_type)
44  self.assertTrue(self.is_topic_published(topic))
45  multipublisher.unregister()
46  self.assertFalse(self.is_topic_published(topic))
47 
49  """ Adds a publisher then removes it. """
50  topic = "/test_register_client"
51  msg_type = "std_msgs/String"
52  client_id = "client1"
53 
54  p = MultiPublisher(topic, msg_type)
55  self.assertFalse(p.has_clients())
56 
57  p.register_client(client_id)
58  self.assertTrue(p.has_clients())
59 
60  p.unregister_client(client_id)
61  self.assertFalse(p.has_clients())
62 
64  """ Adds multiple publishers then removes them. """
65  topic = "/test_register_multiple_clients"
66  msg_type = "std_msgs/String"
67 
68  p = MultiPublisher(topic, msg_type)
69  self.assertFalse(p.has_clients())
70 
71  for i in range(1000):
72  p.register_client("client%d" % i)
73  self.assertTrue(p.has_clients())
74 
75  for i in range(1000):
76  self.assertTrue(p.has_clients())
77  p.unregister_client("client%d" % i)
78 
79  self.assertFalse(p.has_clients())
80 
81  def test_verify_type(self):
82  topic = "/test_verify_type"
83  msg_type = "std_msgs/String"
84  othertypes = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
85  "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
86  "nav_msgs/OccupancyGrid", "geometry_msgs/Point32",
87  "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
88  "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
89  "sensor_msgs/PointCloud2"]
90 
91  p = MultiPublisher(topic, msg_type)
92  p.verify_type(msg_type)
93  for othertype in othertypes:
94  self.assertRaises(TypeConflictException, p.verify_type, othertype)
95 
96  def test_publish(self):
97  """ Make sure that publishing works """
98  topic = "/test_publish"
99  msg_type = "std_msgs/String"
100  msg = {"data": "why halo thar"}
101 
102  received = {"msg": None}
103  def cb(msg):
104  received["msg"] = msg
105 
106  rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
107  p = MultiPublisher(topic, msg_type)
108  p.publish(msg)
109 
110  sleep(0.5)
111 
112  self.assertEqual(received["msg"].data, msg["data"])
113 
114  def test_bad_publish(self):
115  """ Make sure that bad publishing fails """
116  topic = "/test_publish"
117  msg_type = "std_msgs/String"
118  msg = {"data": 3}
119 
120  p = MultiPublisher(topic, msg_type)
121  self.assertRaises(FieldTypeMismatchException, p.publish, msg)
122 
123 
124 if __name__ == '__main__':
125  rosunit.unitrun(PKG, NAME, TestMultiPublisher)
126 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18