7 from time
import sleep, time
16 PKG =
'rosbridge_library'
17 NAME =
'test_multi_publisher'
26 return topicname
in dict(rospy.get_published_topics()).keys()
29 """ Register a publisher on a clean topic with a good msg type """
30 topic =
"/test_register_multipublisher"
31 msg_type =
"std_msgs/String"
38 """ Register and unregister a publisher on a clean topic with a good msg type """
39 topic =
"/test_unregister_multipublisher"
40 msg_type =
"std_msgs/String"
45 multipublisher.unregister()
49 """ Adds a publisher then removes it. """
50 topic =
"/test_register_client"
51 msg_type =
"std_msgs/String"
55 self.assertFalse(p.has_clients())
57 p.register_client(client_id)
58 self.assertTrue(p.has_clients())
60 p.unregister_client(client_id)
61 self.assertFalse(p.has_clients())
64 """ Adds multiple publishers then removes them. """
65 topic =
"/test_register_multiple_clients"
66 msg_type =
"std_msgs/String"
69 self.assertFalse(p.has_clients())
72 p.register_client(
"client%d" % i)
73 self.assertTrue(p.has_clients())
76 self.assertTrue(p.has_clients())
77 p.unregister_client(
"client%d" % i)
79 self.assertFalse(p.has_clients())
82 topic =
"/test_verify_type"
83 msg_type =
"std_msgs/String"
84 othertypes = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
85 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
86 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
87 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
88 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
89 "sensor_msgs/PointCloud2"]
92 p.verify_type(msg_type)
93 for othertype
in othertypes:
94 self.assertRaises(TypeConflictException, p.verify_type, othertype)
97 """ Make sure that publishing works """
98 topic =
"/test_publish"
99 msg_type =
"std_msgs/String"
100 msg = {
"data":
"why halo thar"}
102 received = {
"msg":
None}
104 received[
"msg"] = msg
106 rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
112 self.assertEqual(received[
"msg"].data, msg[
"data"])
115 """ Make sure that bad publishing fails """
116 topic =
"/test_publish"
117 msg_type =
"std_msgs/String"
121 self.assertRaises(FieldTypeMismatchException, p.publish, msg)
124 if __name__ ==
'__main__':
125 rosunit.unitrun(PKG, NAME, TestMultiPublisher)