7 from time
import sleep, time
19 rospy.init_node(
"test_multi_publisher")
22 return topicname
in dict(rospy.get_published_topics()).keys()
25 """ Register a publisher on a clean topic with a good msg type """ 26 topic =
"/test_register_multipublisher" 27 msg_type =
"std_msgs/String" 34 """ Register and unregister a publisher on a clean topic with a good msg type """ 35 topic =
"/test_unregister_multipublisher" 36 msg_type =
"std_msgs/String" 41 multipublisher.unregister()
45 """ Adds a publisher then removes it. """ 46 topic =
"/test_register_client" 47 msg_type =
"std_msgs/String" 51 self.assertFalse(p.has_clients())
53 p.register_client(client_id)
54 self.assertTrue(p.has_clients())
56 p.unregister_client(client_id)
57 self.assertFalse(p.has_clients())
60 """ Adds multiple publishers then removes them. """ 61 topic =
"/test_register_multiple_clients" 62 msg_type =
"std_msgs/String" 65 self.assertFalse(p.has_clients())
68 p.register_client(
"client%d" % i)
69 self.assertTrue(p.has_clients())
72 self.assertTrue(p.has_clients())
73 p.unregister_client(
"client%d" % i)
75 self.assertFalse(p.has_clients())
78 topic =
"/test_verify_type" 79 msg_type =
"std_msgs/String" 80 othertypes = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
81 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
82 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
83 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
84 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
85 "sensor_msgs/PointCloud2"]
88 p.verify_type(msg_type)
89 for othertype
in othertypes:
90 self.assertRaises(TypeConflictException, p.verify_type, othertype)
93 """ Make sure that publishing works """ 94 topic =
"/test_publish" 95 msg_type =
"std_msgs/String" 96 msg = {
"data":
"why halo thar"}
98 received = {
"msg":
None}
100 received[
"msg"] = msg
102 rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
108 self.assertEqual(received[
"msg"].data, msg[
"data"])
111 """ Make sure that bad publishing fails """ 112 topic =
"/test_publish" 113 msg_type =
"std_msgs/String" 117 self.assertRaises(FieldTypeMismatchException, p.publish, msg)
120 PKG =
'rosbridge_library' 121 NAME =
'test_multi_publisher' 122 if __name__ ==
'__main__':
123 rostest.unitrun(PKG, NAME, TestMultiPublisher)
def test_register_multiple_clients(self)
def is_topic_published(self, topicname)
def test_bad_publish(self)
def test_unregister_multipublisher(self)
def test_register_multipublisher(self)
def test_verify_type(self)
def test_register_client(self)