16 from json
import loads, dumps
22 rospy.init_node(
"test_advertise")
23 manager.unregister_timeout = 1.0
26 return topicname
in dict(rospy.get_published_topics()).keys()
31 msg = {
"op":
"advertise"}
32 self.assertRaises(MissingArgumentException, adv.advertise,
loads(
dumps(msg)))
34 msg = {
"op":
"advertise",
"topic":
"/jon"}
35 self.assertRaises(MissingArgumentException, adv.advertise,
loads(
dumps(msg)))
37 msg = {
"op":
"advertise",
"type":
"std_msgs/String"}
38 self.assertRaises(MissingArgumentException, adv.advertise,
loads(
dumps(msg)))
44 msg = {
"op":
"advertise",
"topic": 3,
"type":
"std_msgs/String"}
45 self.assertRaises(InvalidArgumentException, adv.advertise,
loads(
dumps(msg)))
47 msg = {
"op":
"advertise",
"topic":
"/jon",
"type": 3}
48 self.assertRaises(InvalidArgumentException, adv.advertise,
loads(
dumps(msg)))
51 invalid = [
"",
"/",
"//",
"///",
"////",
"/////",
"bad",
"stillbad",
52 "not/better/still",
"not//better//still",
"not///better///still",
53 "better/",
"better//",
"better///",
"/better",
"//better",
"///better",
59 for invalid_type
in invalid:
60 msg = {
"op":
"advertise",
"topic":
"/test_invalid_msg_typestrings",
62 self.assertRaises(ros_loader.InvalidTypeStringException,
66 nonexistent = [
"wangle_msgs/Jam",
"whistleblower_msgs/Document",
67 "sexual_harrassment_msgs/UnwantedAdvance",
"coercion_msgs/Bribe",
68 "airconditioning_msgs/Cold",
"pr2thoughts_msgs/Escape"]
73 for invalid_type
in nonexistent:
74 msg = {
"op":
"advertise",
"topic":
"/test_invalid_msg_package",
76 self.assertRaises(ros_loader.InvalidPackageException,
80 no_msgs = [
"roslib/Time",
"roslib/Duration",
"roslib/Header",
81 "std_srvs/ConflictedMsg",
"topic_tools/MessageMessage"]
86 for invalid_type
in no_msgs:
87 msg = {
"op":
"advertise",
"topic":
"/test_invalid_msg_module",
89 self.assertRaises(ros_loader.InvalidModuleException,
93 nonexistent = [
"roscpp/Time",
"roscpp/Duration",
"roscpp/Header",
94 "rospy/Time",
"rospy/Duration",
"rospy/Header",
"std_msgs/Spool",
95 "geometry_msgs/Tetrahedron",
"sensor_msgs/TelepathyUnit"]
100 for invalid_type
in nonexistent:
101 msg = {
"op":
"advertise",
"topic":
"/test_invalid_msg_classes",
102 "type": invalid_type}
103 self.assertRaises(ros_loader.InvalidClassException,
107 assortedmsgs = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
108 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
109 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
"std_msgs/String",
110 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
111 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
112 "sensor_msgs/PointCloud2"]
117 for valid_type
in assortedmsgs:
118 msg = {
"op":
"advertise",
"topic":
"/" + valid_type,
126 topic =
"/test_do_advertise" 127 type =
"std_msgs/String" 129 msg = {
"op":
"advertise",
"topic": topic,
"type": type}
134 sleep(manager.unregister_timeout * 1.1)
138 PKG =
'rosbridge_library' 139 NAME =
'test_advertise' 140 if __name__ ==
'__main__':
141 rostest.unitrun(PKG, NAME, TestAdvertise)
def test_invalid_msg_classes(self)
def dumps(ob, sort_keys=False)
def test_invalid_arguments(self)
def is_topic_published(self, topicname)
def test_invalid_msg_typestrings(self)
def test_missing_arguments(self)
def test_invalid_msg_package(self)
def test_valid_msg_classes(self)
def test_invalid_msg_module(self)
def test_do_advertise(self)