test_advertise.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 from time import sleep
00007 
00008 from std_msgs.msg import *
00009 
00010 from rosbridge_library.protocol import Protocol
00011 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00012 from rosbridge_library.capabilities.advertise import Registration, Advertise
00013 from rosbridge_library.internal.publishers import manager, UNREGISTER_TIMEOUT
00014 from rosbridge_library.internal import ros_loader
00015 
00016 from json import loads, dumps
00017 
00018 
00019 class TestAdvertise(unittest.TestCase):
00020 
00021     def setUp(self):
00022         rospy.init_node("test_advertise")
00023         UNREGISTER_TIMEOUT = 1.0
00024 
00025     def is_topic_published(self, topicname):
00026         return topicname in dict(rospy.get_published_topics()).keys()
00027 
00028     def test_missing_arguments(self):
00029         proto = Protocol("hello")
00030         adv = Advertise(proto)
00031         msg = {"op": "advertise"}
00032         self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00033 
00034         msg = {"op": "advertise", "topic": "/jon"}
00035         self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00036 
00037         msg = {"op": "advertise", "type": "std_msgs/String"}
00038         self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
00039 
00040     def test_invalid_arguments(self):
00041         proto = Protocol("hello")
00042         adv = Advertise(proto)
00043 
00044         msg = {"op": "advertise", "topic": 3, "type": "std_msgs/String"}
00045         self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00046 
00047         msg = {"op": "advertise", "topic": "/jon", "type": 3}
00048         self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
00049 
00050     def test_invalid_msg_typestrings(self):
00051         invalid = ["", "/", "//", "///", "////", "/////", "bad", "stillbad",
00052        "not/better/still", "not//better//still", "not///better///still",
00053        "better/", "better//", "better///", "/better", "//better", "///better",
00054        "this\isbad", "\\"]
00055 
00056         proto = Protocol("hello")
00057         adv = Advertise(proto)
00058 
00059         for invalid_type in invalid:
00060             msg = {"op": "advertise", "topic": "/test_invalid_msg_typestrings",
00061                    "type": invalid_type}
00062             self.assertRaises(ros_loader.InvalidTypeStringException,
00063                               adv.advertise, loads(dumps(msg)))
00064 
00065     def test_invalid_msg_package(self):
00066         nonexistent = ["wangle_msgs/Jam", "whistleblower_msgs/Document",
00067         "sexual_harrassment_msgs/UnwantedAdvance", "coercion_msgs/Bribe",
00068         "airconditioning_msgs/Cold", "pr2thoughts_msgs/Escape"]
00069 
00070         proto = Protocol("hello")
00071         adv = Advertise(proto)
00072 
00073         for invalid_type in nonexistent:
00074             msg = {"op": "advertise", "topic": "/test_invalid_msg_package",
00075                    "type": invalid_type}
00076             self.assertRaises(ros_loader.InvalidPackageException,
00077                               adv.advertise, loads(dumps(msg)))
00078 
00079     def test_invalid_msg_module(self):
00080         no_msgs = ["roslib/Time", "roslib/Duration", "roslib/Header",
00081         "std_srvs/ConflictedMsg", "topic_tools/MessageMessage"]
00082 
00083         proto = Protocol("hello")
00084         adv = Advertise(proto)
00085 
00086         for invalid_type in no_msgs:
00087             msg = {"op": "advertise", "topic": "/test_invalid_msg_module",
00088                    "type": invalid_type}
00089             self.assertRaises(ros_loader.InvalidModuleException,
00090                               adv.advertise, loads(dumps(msg)))
00091 
00092     def test_invalid_msg_classes(self):
00093         nonexistent = ["roscpp/Time", "roscpp/Duration", "roscpp/Header",
00094         "rospy/Time", "rospy/Duration", "rospy/Header", "std_msgs/Spool",
00095         "geometry_msgs/Tetrahedron", "sensor_msgs/TelepathyUnit"]
00096 
00097         proto = Protocol("hello")
00098         adv = Advertise(proto)
00099 
00100         for invalid_type in nonexistent:
00101             msg = {"op": "advertise", "topic": "/test_invalid_msg_classes",
00102                    "type": invalid_type}
00103             self.assertRaises(ros_loader.InvalidClassException,
00104                               adv.advertise, loads(dumps(msg)))
00105 
00106     def test_valid_msg_classes(self):
00107         assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00108         "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00109         "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00110         "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00111         "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00112         "sensor_msgs/PointCloud2"]
00113 
00114         proto = Protocol("hello")
00115         adv = Advertise(proto)
00116 
00117         for valid_type in assortedmsgs:
00118             msg = {"op": "advertise", "topic": "/" + valid_type,
00119                    "type": valid_type}
00120             adv.advertise(loads(dumps(msg)))
00121             adv.unadvertise(loads(dumps(msg)))
00122 
00123     def test_do_advertise(self):
00124         proto = Protocol("hello")
00125         adv = Advertise(proto)
00126         topic = "/test_do_advertise"
00127         type = "std_msgs/String"
00128 
00129         msg = {"op": "advertise", "topic": topic, "type": type}
00130         adv.advertise(loads(dumps(msg)))
00131         self.assertTrue(self.is_topic_published(topic))
00132         adv.unadvertise(loads(dumps(msg)))
00133         self.assertTrue(self.is_topic_published(topic))
00134         sleep(UNREGISTER_TIMEOUT*1.1)
00135         self.assertFalse(self.is_topic_published(topic))
00136 
00137 
00138 PKG = 'rosbridge_library'
00139 NAME = 'test_advertise'
00140 if __name__ == '__main__':
00141     rostest.unitrun(PKG, NAME, TestAdvertise)
00142 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43