00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 from time import sleep
00007
00008 from std_msgs.msg import *
00009
00010 from rosbridge_library.protocol import Protocol
00011 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00012 from rosbridge_library.capabilities.advertise import Registration, Advertise
00013 from rosbridge_library.internal.publishers import manager, UNREGISTER_TIMEOUT
00014 from rosbridge_library.internal import ros_loader
00015
00016 from json import loads, dumps
00017
00018
00019 class TestAdvertise(unittest.TestCase):
00020
00021 def setUp(self):
00022 rospy.init_node("test_advertise")
00023 UNREGISTER_TIMEOUT = 1.0
00024
00025 def is_topic_published(self, topicname):
00026 return topicname in dict(rospy.get_published_topics()).keys()
00027
00028 def test_missing_arguments(self):
00029 proto = Protocol("hello")
00030 adv = Advertise(proto)
00031 msg = {"op": "advertise"}
00032 self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00033
00034 msg = {"op": "advertise", "topic": "/jon"}
00035 self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00036
00037 msg = {"op": "advertise", "type": "std_msgs/String"}
00038 self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00039
00040 def test_invalid_arguments(self):
00041 proto = Protocol("hello")
00042 adv = Advertise(proto)
00043
00044 msg = {"op": "advertise", "topic": 3, "type": "std_msgs/String"}
00045 self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00046
00047 msg = {"op": "advertise", "topic": "/jon", "type": 3}
00048 self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00049
00050 def test_invalid_msg_typestrings(self):
00051 invalid = ["", "/", "//", "///", "////", "/////", "bad", "stillbad",
00052 "not/better/still", "not//better//still", "not///better///still",
00053 "better/", "better//", "better///", "/better", "//better", "///better",
00054 "this\isbad", "\\"]
00055
00056 proto = Protocol("hello")
00057 adv = Advertise(proto)
00058
00059 for invalid_type in invalid:
00060 msg = {"op": "advertise", "topic": "/test_invalid_msg_typestrings",
00061 "type": invalid_type}
00062 self.assertRaises(ros_loader.InvalidTypeStringException,
00063 adv.advertise, loads(dumps(msg)))
00064
00065 def test_invalid_msg_package(self):
00066 nonexistent = ["wangle_msgs/Jam", "whistleblower_msgs/Document",
00067 "sexual_harrassment_msgs/UnwantedAdvance", "coercion_msgs/Bribe",
00068 "airconditioning_msgs/Cold", "pr2thoughts_msgs/Escape"]
00069
00070 proto = Protocol("hello")
00071 adv = Advertise(proto)
00072
00073 for invalid_type in nonexistent:
00074 msg = {"op": "advertise", "topic": "/test_invalid_msg_package",
00075 "type": invalid_type}
00076 self.assertRaises(ros_loader.InvalidPackageException,
00077 adv.advertise, loads(dumps(msg)))
00078
00079 def test_invalid_msg_module(self):
00080 no_msgs = ["roslib/Time", "roslib/Duration", "roslib/Header",
00081 "std_srvs/ConflictedMsg", "topic_tools/MessageMessage"]
00082
00083 proto = Protocol("hello")
00084 adv = Advertise(proto)
00085
00086 for invalid_type in no_msgs:
00087 msg = {"op": "advertise", "topic": "/test_invalid_msg_module",
00088 "type": invalid_type}
00089 self.assertRaises(ros_loader.InvalidModuleException,
00090 adv.advertise, loads(dumps(msg)))
00091
00092 def test_invalid_msg_classes(self):
00093 nonexistent = ["roscpp/Time", "roscpp/Duration", "roscpp/Header",
00094 "rospy/Time", "rospy/Duration", "rospy/Header", "std_msgs/Spool",
00095 "geometry_msgs/Tetrahedron", "sensor_msgs/TelepathyUnit"]
00096
00097 proto = Protocol("hello")
00098 adv = Advertise(proto)
00099
00100 for invalid_type in nonexistent:
00101 msg = {"op": "advertise", "topic": "/test_invalid_msg_classes",
00102 "type": invalid_type}
00103 self.assertRaises(ros_loader.InvalidClassException,
00104 adv.advertise, loads(dumps(msg)))
00105
00106 def test_valid_msg_classes(self):
00107 assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00108 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00109 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00110 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00111 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00112 "sensor_msgs/PointCloud2"]
00113
00114 proto = Protocol("hello")
00115 adv = Advertise(proto)
00116
00117 for valid_type in assortedmsgs:
00118 msg = {"op": "advertise", "topic": "/" + valid_type,
00119 "type": valid_type}
00120 adv.advertise(loads(dumps(msg)))
00121 adv.unadvertise(loads(dumps(msg)))
00122
00123 def test_do_advertise(self):
00124 proto = Protocol("hello")
00125 adv = Advertise(proto)
00126 topic = "/test_do_advertise"
00127 type = "std_msgs/String"
00128
00129 msg = {"op": "advertise", "topic": topic, "type": type}
00130 adv.advertise(loads(dumps(msg)))
00131 self.assertTrue(self.is_topic_published(topic))
00132 adv.unadvertise(loads(dumps(msg)))
00133 self.assertTrue(self.is_topic_published(topic))
00134 sleep(UNREGISTER_TIMEOUT*1.1)
00135 self.assertFalse(self.is_topic_published(topic))
00136
00137
00138 PKG = 'rosbridge_library'
00139 NAME = 'test_advertise'
00140 if __name__ == '__main__':
00141 rostest.unitrun(PKG, NAME, TestAdvertise)
00142