16 from json
import loads, dumps
19 PKG =
'rosbridge_library' 20 NAME =
'test_advertise' 28 manager.unregister_timeout = 1.0
31 return topicname
in dict(rospy.get_published_topics()).keys()
36 msg = {
"op":
"advertise"}
37 self.assertRaises(MissingArgumentException, adv.advertise,
loads(
dumps(msg)))
39 msg = {
"op":
"advertise",
"topic":
"/jon"}
40 self.assertRaises(MissingArgumentException, adv.advertise,
loads(
dumps(msg)))
42 msg = {
"op":
"advertise",
"type":
"std_msgs/String"}
43 self.assertRaises(MissingArgumentException, adv.advertise,
loads(
dumps(msg)))
49 msg = {
"op":
"advertise",
"topic": 3,
"type":
"std_msgs/String"}
50 self.assertRaises(InvalidArgumentException, adv.advertise,
loads(
dumps(msg)))
52 msg = {
"op":
"advertise",
"topic":
"/jon",
"type": 3}
53 self.assertRaises(InvalidArgumentException, adv.advertise,
loads(
dumps(msg)))
56 invalid = [
"",
"/",
"//",
"///",
"////",
"/////",
"bad",
"stillbad",
57 "not/better/still",
"not//better//still",
"not///better///still",
58 "better/",
"better//",
"better///",
"/better",
"//better",
"///better",
64 for invalid_type
in invalid:
65 msg = {
"op":
"advertise",
"topic":
"/test_invalid_msg_typestrings",
67 self.assertRaises(ros_loader.InvalidTypeStringException,
71 nonexistent = [
"wangle_msgs/Jam",
"whistleblower_msgs/Document",
72 "sexual_harrassment_msgs/UnwantedAdvance",
"coercion_msgs/Bribe",
73 "airconditioning_msgs/Cold",
"pr2thoughts_msgs/Escape"]
78 for invalid_type
in nonexistent:
79 msg = {
"op":
"advertise",
"topic":
"/test_invalid_msg_package",
81 self.assertRaises(ros_loader.InvalidPackageException,
85 no_msgs = [
"roslib/Time",
"roslib/Duration",
"roslib/Header",
86 "std_srvs/ConflictedMsg",
"topic_tools/MessageMessage"]
91 for invalid_type
in no_msgs:
92 msg = {
"op":
"advertise",
"topic":
"/test_invalid_msg_module",
94 self.assertRaises(ros_loader.InvalidModuleException,
98 nonexistent = [
"roscpp/Time",
"roscpp/Duration",
"roscpp/Header",
99 "rospy/Time",
"rospy/Duration",
"rospy/Header",
"std_msgs/Spool",
100 "geometry_msgs/Tetrahedron",
"sensor_msgs/TelepathyUnit"]
105 for invalid_type
in nonexistent:
106 msg = {
"op":
"advertise",
"topic":
"/test_invalid_msg_classes",
107 "type": invalid_type}
108 self.assertRaises(ros_loader.InvalidClassException,
112 assortedmsgs = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
113 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
114 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
"std_msgs/String",
115 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
116 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
117 "sensor_msgs/PointCloud2"]
122 for valid_type
in assortedmsgs:
123 msg = {
"op":
"advertise",
"topic":
"/" + valid_type,
131 topic =
"/test_do_advertise" 132 type =
"std_msgs/String" 134 msg = {
"op":
"advertise",
"topic": topic,
"type": type}
139 sleep(manager.unregister_timeout * 1.1)
143 if __name__ ==
'__main__':
144 rosunit.unitrun(PKG, NAME, TestAdvertise)
def test_invalid_msg_classes(self)
def dumps(ob, sort_keys=False)
def test_invalid_arguments(self)
def is_topic_published(self, topicname)
def test_invalid_msg_typestrings(self)
def test_missing_arguments(self)
def test_invalid_msg_package(self)
def test_valid_msg_classes(self)
def test_invalid_msg_module(self)
def test_do_advertise(self)