7 from time
import sleep, time
16 PKG =
'rosbridge_library' 17 NAME =
'test_multi_publisher' 26 return topicname
in dict(rospy.get_published_topics()).keys()
29 """ Register a publisher on a clean topic with a good msg type """ 30 topic =
"/test_register_multipublisher" 31 msg_type =
"std_msgs/String" 38 """ Register and unregister a publisher on a clean topic with a good msg type """ 39 topic =
"/test_unregister_multipublisher" 40 msg_type =
"std_msgs/String" 45 multipublisher.unregister()
49 """ Adds a publisher then removes it. """ 50 topic =
"/test_register_client" 51 msg_type =
"std_msgs/String" 55 self.assertFalse(p.has_clients())
57 p.register_client(client_id)
58 self.assertTrue(p.has_clients())
60 p.unregister_client(client_id)
61 self.assertFalse(p.has_clients())
64 """ Adds multiple publishers then removes them. """ 65 topic =
"/test_register_multiple_clients" 66 msg_type =
"std_msgs/String" 69 self.assertFalse(p.has_clients())
72 p.register_client(
"client%d" % i)
73 self.assertTrue(p.has_clients())
76 self.assertTrue(p.has_clients())
77 p.unregister_client(
"client%d" % i)
79 self.assertFalse(p.has_clients())
82 topic =
"/test_verify_type" 83 msg_type =
"std_msgs/String" 84 othertypes = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
85 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
86 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
87 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
88 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
89 "sensor_msgs/PointCloud2"]
92 p.verify_type(msg_type)
93 for othertype
in othertypes:
94 self.assertRaises(TypeConflictException, p.verify_type, othertype)
97 """ Make sure that publishing works """ 98 topic =
"/test_publish" 99 msg_type =
"std_msgs/String" 100 msg = {
"data":
"why halo thar"}
102 received = {
"msg":
None}
104 received[
"msg"] = msg
106 rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
112 self.assertEqual(received[
"msg"].data, msg[
"data"])
115 """ Make sure that bad publishing fails """ 116 topic =
"/test_publish" 117 msg_type =
"std_msgs/String" 121 self.assertRaises(FieldTypeMismatchException, p.publish, msg)
124 if __name__ ==
'__main__':
125 rosunit.unitrun(PKG, NAME, TestMultiPublisher)
def test_register_multiple_clients(self)
def is_topic_published(self, topicname)
def test_bad_publish(self)
def test_unregister_multipublisher(self)
def test_register_multipublisher(self)
def test_verify_type(self)
def test_register_client(self)