00001
00002 PKG = 'rosbridge_library'
00003 import roslib
00004 roslib.load_manifest(PKG)
00005 roslib.load_manifest("std_msgs")
00006 import rospy
00007 from rospy import get_published_topics
00008
00009 from rosbridge_library.protocol import Protocol
00010 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00011 from rosbridge_library.capabilities.advertise import Registration, Advertise
00012 from rosbridge_library.internal.publishers import manager
00013 from rosbridge_library.internal import ros_loader
00014
00015 from std_msgs.msg import String
00016
00017 from json import loads, dumps
00018
00019 import unittest
00020
00021
00022 class TestAdvertise(unittest.TestCase):
00023
00024 def setUp(self):
00025 rospy.init_node("test_advertise")
00026
00027 def is_topic_published(self, topicname):
00028 return topicname in dict(get_published_topics()).keys()
00029
00030 def test_missing_arguments(self):
00031 proto = Protocol("hello")
00032 adv = Advertise(proto)
00033 msg = {"op": "advertise"}
00034 self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00035
00036 msg = {"op": "advertise", "topic": "/jon"}
00037 self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00038
00039 msg = {"op": "advertise", "type": "std_msgs/String"}
00040 self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00041
00042 def test_invalid_arguments(self):
00043 proto = Protocol("hello")
00044 adv = Advertise(proto)
00045
00046 msg = {"op": "advertise", "topic": 3, "type": "std_msgs/String"}
00047 self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00048
00049 msg = {"op": "advertise", "topic": "/jon", "type": 3}
00050 self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00051
00052 def test_invalid_msg_typestrings(self):
00053 invalid = ["", "/", "//", "///", "////", "/////", "bad", "stillbad",
00054 "not/better/still", "not//better//still", "not///better///still",
00055 "better/", "better//", "better///", "/better", "//better", "///better",
00056 "this\isbad", "\\"]
00057
00058 proto = Protocol("hello")
00059 adv = Advertise(proto)
00060
00061 for invalid_type in invalid:
00062 msg = {"op": "advertise", "topic": "/test_invalid_msg_typestrings",
00063 "type": invalid_type}
00064 self.assertRaises(ros_loader.InvalidTypeStringException,
00065 adv.advertise, loads(dumps(msg)))
00066
00067 def test_invalid_msg_package(self):
00068 nonexistent = ["wangle_msgs/Jam", "whistleblower_msgs/Document",
00069 "sexual_harrassment_msgs/UnwantedAdvance", "coercion_msgs/Bribe",
00070 "airconditioning_msgs/Cold", "pr2thoughts_msgs/Escape"]
00071
00072 proto = Protocol("hello")
00073 adv = Advertise(proto)
00074
00075 for invalid_type in nonexistent:
00076 msg = {"op": "advertise", "topic": "/test_invalid_msg_package",
00077 "type": invalid_type}
00078 self.assertRaises(ros_loader.InvalidPackageException,
00079 adv.advertise, loads(dumps(msg)))
00080
00081 def test_invalid_msg_module(self):
00082 no_msgs = ["roslib/Time", "roslib/Duration", "roslib/Header",
00083 "std_srvs/ConflictedMsg", "topic_tools/MessageMessage"]
00084
00085 proto = Protocol("hello")
00086 adv = Advertise(proto)
00087
00088 for invalid_type in no_msgs:
00089 msg = {"op": "advertise", "topic": "/test_invalid_msg_module",
00090 "type": invalid_type}
00091 self.assertRaises(ros_loader.InvalidModuleException,
00092 adv.advertise, loads(dumps(msg)))
00093
00094 def test_invalid_msg_classes(self):
00095 nonexistent = ["roscpp/Time", "roscpp/Duration", "roscpp/Header",
00096 "rospy/Time", "rospy/Duration", "rospy/Header", "std_msgs/Spool",
00097 "geometry_msgs/Tetrahedron", "sensor_msgs/TelepathyUnit"]
00098
00099 proto = Protocol("hello")
00100 adv = Advertise(proto)
00101
00102 for invalid_type in nonexistent:
00103 msg = {"op": "advertise", "topic": "/test_invalid_msg_classes",
00104 "type": invalid_type}
00105 self.assertRaises(ros_loader.InvalidClassException,
00106 adv.advertise, loads(dumps(msg)))
00107
00108 def test_valid_msg_classes(self):
00109 assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00110 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00111 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00112 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00113 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00114 "sensor_msgs/PointCloud2"]
00115
00116 proto = Protocol("hello")
00117 adv = Advertise(proto)
00118
00119 for valid_type in assortedmsgs:
00120 msg = {"op": "advertise", "topic": "/" + valid_type,
00121 "type": valid_type}
00122 adv.advertise(loads(dumps(msg)))
00123 adv.unadvertise(loads(dumps(msg)))
00124
00125 def test_do_advertise(self):
00126 proto = Protocol("hello")
00127 adv = Advertise(proto)
00128 topic = "/test_do_advertise"
00129 type = "std_msgs/String"
00130
00131 msg = {"op": "advertise", "topic": topic, "type": type}
00132 adv.advertise(loads(dumps(msg)))
00133 self.assertTrue(self.is_topic_published(topic))
00134 adv.unadvertise(loads(dumps(msg)))
00135 self.assertFalse(self.is_topic_published(topic))
00136
00137
00138 if __name__ == '__main__':
00139 import rostest
00140 rostest.rosrun(PKG, 'test_publish', TestAdvertise)