00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006
00007 from std_msgs.msg import *
00008
00009 from rosbridge_library.protocol import Protocol
00010 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00011 from rosbridge_library.capabilities.advertise import Registration, Advertise
00012 from rosbridge_library.internal.publishers import manager
00013 from rosbridge_library.internal import ros_loader
00014
00015 from json import loads, dumps
00016
00017
00018 class TestAdvertise(unittest.TestCase):
00019
00020 def setUp(self):
00021 rospy.init_node("test_advertise")
00022
00023 def is_topic_published(self, topicname):
00024 return topicname in dict(rospy.get_published_topics()).keys()
00025
00026 def test_missing_arguments(self):
00027 proto = Protocol("hello")
00028 adv = Advertise(proto)
00029 msg = {"op": "advertise"}
00030 self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00031
00032 msg = {"op": "advertise", "topic": "/jon"}
00033 self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00034
00035 msg = {"op": "advertise", "type": "std_msgs/String"}
00036 self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00037
00038 def test_invalid_arguments(self):
00039 proto = Protocol("hello")
00040 adv = Advertise(proto)
00041
00042 msg = {"op": "advertise", "topic": 3, "type": "std_msgs/String"}
00043 self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00044
00045 msg = {"op": "advertise", "topic": "/jon", "type": 3}
00046 self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00047
00048 def test_invalid_msg_typestrings(self):
00049 invalid = ["", "/", "//", "///", "////", "/////", "bad", "stillbad",
00050 "not/better/still", "not//better//still", "not///better///still",
00051 "better/", "better//", "better///", "/better", "//better", "///better",
00052 "this\isbad", "\\"]
00053
00054 proto = Protocol("hello")
00055 adv = Advertise(proto)
00056
00057 for invalid_type in invalid:
00058 msg = {"op": "advertise", "topic": "/test_invalid_msg_typestrings",
00059 "type": invalid_type}
00060 self.assertRaises(ros_loader.InvalidTypeStringException,
00061 adv.advertise, loads(dumps(msg)))
00062
00063 def test_invalid_msg_package(self):
00064 nonexistent = ["wangle_msgs/Jam", "whistleblower_msgs/Document",
00065 "sexual_harrassment_msgs/UnwantedAdvance", "coercion_msgs/Bribe",
00066 "airconditioning_msgs/Cold", "pr2thoughts_msgs/Escape"]
00067
00068 proto = Protocol("hello")
00069 adv = Advertise(proto)
00070
00071 for invalid_type in nonexistent:
00072 msg = {"op": "advertise", "topic": "/test_invalid_msg_package",
00073 "type": invalid_type}
00074 self.assertRaises(ros_loader.InvalidPackageException,
00075 adv.advertise, loads(dumps(msg)))
00076
00077 def test_invalid_msg_module(self):
00078 no_msgs = ["roslib/Time", "roslib/Duration", "roslib/Header",
00079 "std_srvs/ConflictedMsg", "topic_tools/MessageMessage"]
00080
00081 proto = Protocol("hello")
00082 adv = Advertise(proto)
00083
00084 for invalid_type in no_msgs:
00085 msg = {"op": "advertise", "topic": "/test_invalid_msg_module",
00086 "type": invalid_type}
00087 self.assertRaises(ros_loader.InvalidModuleException,
00088 adv.advertise, loads(dumps(msg)))
00089
00090 def test_invalid_msg_classes(self):
00091 nonexistent = ["roscpp/Time", "roscpp/Duration", "roscpp/Header",
00092 "rospy/Time", "rospy/Duration", "rospy/Header", "std_msgs/Spool",
00093 "geometry_msgs/Tetrahedron", "sensor_msgs/TelepathyUnit"]
00094
00095 proto = Protocol("hello")
00096 adv = Advertise(proto)
00097
00098 for invalid_type in nonexistent:
00099 msg = {"op": "advertise", "topic": "/test_invalid_msg_classes",
00100 "type": invalid_type}
00101 self.assertRaises(ros_loader.InvalidClassException,
00102 adv.advertise, loads(dumps(msg)))
00103
00104 def test_valid_msg_classes(self):
00105 assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00106 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00107 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00108 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00109 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00110 "sensor_msgs/PointCloud2"]
00111
00112 proto = Protocol("hello")
00113 adv = Advertise(proto)
00114
00115 for valid_type in assortedmsgs:
00116 msg = {"op": "advertise", "topic": "/" + valid_type,
00117 "type": valid_type}
00118 adv.advertise(loads(dumps(msg)))
00119 adv.unadvertise(loads(dumps(msg)))
00120
00121 def test_do_advertise(self):
00122 proto = Protocol("hello")
00123 adv = Advertise(proto)
00124 topic = "/test_do_advertise"
00125 type = "std_msgs/String"
00126
00127 msg = {"op": "advertise", "topic": topic, "type": type}
00128 adv.advertise(loads(dumps(msg)))
00129 self.assertTrue(self.is_topic_published(topic))
00130 adv.unadvertise(loads(dumps(msg)))
00131 self.assertFalse(self.is_topic_published(topic))
00132
00133
00134 PKG = 'rosbridge_library'
00135 NAME = 'test_advertise'
00136 if __name__ == '__main__':
00137 rostest.unitrun(PKG, NAME, TestAdvertise)
00138