test_advertise.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib
00004 roslib.load_manifest(PKG)
00005 roslib.load_manifest("std_msgs")
00006 import rospy
00007 from rospy import get_published_topics
00008 
00009 from rosbridge_library.protocol import Protocol
00010 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00011 from rosbridge_library.capabilities.advertise import Registration, Advertise
00012 from rosbridge_library.internal.publishers import manager
00013 from rosbridge_library.internal import ros_loader
00014 
00015 from std_msgs.msg import String
00016 
00017 from json import loads, dumps
00018 
00019 import unittest
00020 
00021 
00022 class TestAdvertise(unittest.TestCase):
00023 
00024     def setUp(self):
00025         rospy.init_node("test_advertise")
00026 
00027     def is_topic_published(self, topicname):
00028         return topicname in dict(get_published_topics()).keys()
00029 
00030     def test_missing_arguments(self):
00031         proto = Protocol("hello")
00032         adv = Advertise(proto)
00033         msg = {"op": "advertise"}
00034         self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00035 
00036         msg = {"op": "advertise", "topic": "/jon"}
00037         self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00038 
00039         msg = {"op": "advertise", "type": "std_msgs/String"}
00040         self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00041 
00042     def test_invalid_arguments(self):
00043         proto = Protocol("hello")
00044         adv = Advertise(proto)
00045 
00046         msg = {"op": "advertise", "topic": 3, "type": "std_msgs/String"}
00047         self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00048 
00049         msg = {"op": "advertise", "topic": "/jon", "type": 3}
00050         self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00051 
00052     def test_invalid_msg_typestrings(self):
00053         invalid = ["", "/", "//", "///", "////", "/////", "bad", "stillbad",
00054        "not/better/still", "not//better//still", "not///better///still",
00055        "better/", "better//", "better///", "/better", "//better", "///better",
00056        "this\isbad", "\\"]
00057 
00058         proto = Protocol("hello")
00059         adv = Advertise(proto)
00060 
00061         for invalid_type in invalid:
00062             msg = {"op": "advertise", "topic": "/test_invalid_msg_typestrings",
00063                    "type": invalid_type}
00064             self.assertRaises(ros_loader.InvalidTypeStringException,
00065                               adv.advertise, loads(dumps(msg)))
00066 
00067     def test_invalid_msg_package(self):
00068         nonexistent = ["wangle_msgs/Jam", "whistleblower_msgs/Document",
00069         "sexual_harrassment_msgs/UnwantedAdvance", "coercion_msgs/Bribe",
00070         "airconditioning_msgs/Cold", "pr2thoughts_msgs/Escape"]
00071 
00072         proto = Protocol("hello")
00073         adv = Advertise(proto)
00074 
00075         for invalid_type in nonexistent:
00076             msg = {"op": "advertise", "topic": "/test_invalid_msg_package",
00077                    "type": invalid_type}
00078             self.assertRaises(ros_loader.InvalidPackageException,
00079                               adv.advertise, loads(dumps(msg)))
00080 
00081     def test_invalid_msg_module(self):
00082         no_msgs = ["roslib/Time", "roslib/Duration", "roslib/Header",
00083         "std_srvs/ConflictedMsg", "topic_tools/MessageMessage"]
00084 
00085         proto = Protocol("hello")
00086         adv = Advertise(proto)
00087 
00088         for invalid_type in no_msgs:
00089             msg = {"op": "advertise", "topic": "/test_invalid_msg_module",
00090                    "type": invalid_type}
00091             self.assertRaises(ros_loader.InvalidModuleException,
00092                               adv.advertise, loads(dumps(msg)))
00093 
00094     def test_invalid_msg_classes(self):
00095         nonexistent = ["roscpp/Time", "roscpp/Duration", "roscpp/Header",
00096         "rospy/Time", "rospy/Duration", "rospy/Header", "std_msgs/Spool",
00097         "geometry_msgs/Tetrahedron", "sensor_msgs/TelepathyUnit"]
00098 
00099         proto = Protocol("hello")
00100         adv = Advertise(proto)
00101 
00102         for invalid_type in nonexistent:
00103             msg = {"op": "advertise", "topic": "/test_invalid_msg_classes",
00104                    "type": invalid_type}
00105             self.assertRaises(ros_loader.InvalidClassException,
00106                               adv.advertise, loads(dumps(msg)))
00107 
00108     def test_valid_msg_classes(self):
00109         assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00110         "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00111         "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00112         "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00113         "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00114         "sensor_msgs/PointCloud2"]
00115 
00116         proto = Protocol("hello")
00117         adv = Advertise(proto)
00118 
00119         for valid_type in assortedmsgs:
00120             msg = {"op": "advertise", "topic": "/" + valid_type,
00121                    "type": valid_type}
00122             adv.advertise(loads(dumps(msg)))
00123             adv.unadvertise(loads(dumps(msg)))
00124 
00125     def test_do_advertise(self):
00126         proto = Protocol("hello")
00127         adv = Advertise(proto)
00128         topic = "/test_do_advertise"
00129         type = "std_msgs/String"
00130 
00131         msg = {"op": "advertise", "topic": topic, "type": type}
00132         adv.advertise(loads(dumps(msg)))
00133         self.assertTrue(self.is_topic_published(topic))
00134         adv.unadvertise(loads(dumps(msg)))
00135         self.assertFalse(self.is_topic_published(topic))
00136 
00137 
00138 if __name__ == '__main__':
00139     import rostest
00140     rostest.rosrun(PKG, 'test_publish', TestAdvertise)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04