test_advertise.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 from time import sleep
7 
8 from std_msgs.msg import *
9 
10 from rosbridge_library.protocol import Protocol
11 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
12 from rosbridge_library.capabilities.advertise import Registration, Advertise
13 from rosbridge_library.internal.publishers import manager
14 from rosbridge_library.internal import ros_loader
15 
16 from json import loads, dumps
17 
18 
19 class TestAdvertise(unittest.TestCase):
20 
21  def setUp(self):
22  rospy.init_node("test_advertise")
23  manager.unregister_timeout = 1.0
24 
25  def is_topic_published(self, topicname):
26  return topicname in dict(rospy.get_published_topics()).keys()
27 
29  proto = Protocol("hello")
30  adv = Advertise(proto)
31  msg = {"op": "advertise"}
32  self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
33 
34  msg = {"op": "advertise", "topic": "/jon"}
35  self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
36 
37  msg = {"op": "advertise", "type": "std_msgs/String"}
38  self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
39 
41  proto = Protocol("hello")
42  adv = Advertise(proto)
43 
44  msg = {"op": "advertise", "topic": 3, "type": "std_msgs/String"}
45  self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
46 
47  msg = {"op": "advertise", "topic": "/jon", "type": 3}
48  self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
49 
51  invalid = ["", "/", "//", "///", "////", "/////", "bad", "stillbad",
52  "not/better/still", "not//better//still", "not///better///still",
53  "better/", "better//", "better///", "/better", "//better", "///better",
54  "this\isbad", "\\"]
55 
56  proto = Protocol("hello")
57  adv = Advertise(proto)
58 
59  for invalid_type in invalid:
60  msg = {"op": "advertise", "topic": "/test_invalid_msg_typestrings",
61  "type": invalid_type}
62  self.assertRaises(ros_loader.InvalidTypeStringException,
63  adv.advertise, loads(dumps(msg)))
64 
66  nonexistent = ["wangle_msgs/Jam", "whistleblower_msgs/Document",
67  "sexual_harrassment_msgs/UnwantedAdvance", "coercion_msgs/Bribe",
68  "airconditioning_msgs/Cold", "pr2thoughts_msgs/Escape"]
69 
70  proto = Protocol("hello")
71  adv = Advertise(proto)
72 
73  for invalid_type in nonexistent:
74  msg = {"op": "advertise", "topic": "/test_invalid_msg_package",
75  "type": invalid_type}
76  self.assertRaises(ros_loader.InvalidPackageException,
77  adv.advertise, loads(dumps(msg)))
78 
80  no_msgs = ["roslib/Time", "roslib/Duration", "roslib/Header",
81  "std_srvs/ConflictedMsg", "topic_tools/MessageMessage"]
82 
83  proto = Protocol("hello")
84  adv = Advertise(proto)
85 
86  for invalid_type in no_msgs:
87  msg = {"op": "advertise", "topic": "/test_invalid_msg_module",
88  "type": invalid_type}
89  self.assertRaises(ros_loader.InvalidModuleException,
90  adv.advertise, loads(dumps(msg)))
91 
93  nonexistent = ["roscpp/Time", "roscpp/Duration", "roscpp/Header",
94  "rospy/Time", "rospy/Duration", "rospy/Header", "std_msgs/Spool",
95  "geometry_msgs/Tetrahedron", "sensor_msgs/TelepathyUnit"]
96 
97  proto = Protocol("hello")
98  adv = Advertise(proto)
99 
100  for invalid_type in nonexistent:
101  msg = {"op": "advertise", "topic": "/test_invalid_msg_classes",
102  "type": invalid_type}
103  self.assertRaises(ros_loader.InvalidClassException,
104  adv.advertise, loads(dumps(msg)))
105 
107  assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
108  "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
109  "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
110  "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
111  "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
112  "sensor_msgs/PointCloud2"]
113 
114  proto = Protocol("hello")
115  adv = Advertise(proto)
116 
117  for valid_type in assortedmsgs:
118  msg = {"op": "advertise", "topic": "/" + valid_type,
119  "type": valid_type}
120  adv.advertise(loads(dumps(msg)))
121  adv.unadvertise(loads(dumps(msg)))
122 
123  def test_do_advertise(self):
124  proto = Protocol("hello")
125  adv = Advertise(proto)
126  topic = "/test_do_advertise"
127  type = "std_msgs/String"
128 
129  msg = {"op": "advertise", "topic": topic, "type": type}
130  adv.advertise(loads(dumps(msg)))
131  self.assertTrue(self.is_topic_published(topic))
132  adv.unadvertise(loads(dumps(msg)))
133  self.assertTrue(self.is_topic_published(topic))
134  sleep(manager.unregister_timeout * 1.1)
135  self.assertFalse(self.is_topic_published(topic))
136 
137 
138 PKG = 'rosbridge_library'
139 NAME = 'test_advertise'
140 if __name__ == '__main__':
141  rostest.unitrun(PKG, NAME, TestAdvertise)
142 
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri May 10 2019 02:17:02