test_advertise.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 from time import sleep
7 
8 from std_msgs.msg import *
9 
10 from rosbridge_library.protocol import Protocol
11 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
12 from rosbridge_library.capabilities.advertise import Registration, Advertise
13 from rosbridge_library.internal.publishers import manager
14 from rosbridge_library.internal import ros_loader
15 
16 from json import loads, dumps
17 
18 
19 PKG = 'rosbridge_library'
20 NAME = 'test_advertise'
21 
22 
23 class TestAdvertise(unittest.TestCase):
24 
25  def setUp(self):
26  rospy.init_node(NAME)
27 
28  manager.unregister_timeout = 1.0
29 
30  def is_topic_published(self, topicname):
31  return topicname in dict(rospy.get_published_topics()).keys()
32 
34  proto = Protocol("hello")
35  adv = Advertise(proto)
36  msg = {"op": "advertise"}
37  self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
38 
39  msg = {"op": "advertise", "topic": "/jon"}
40  self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
41 
42  msg = {"op": "advertise", "type": "std_msgs/String"}
43  self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg)))
44 
46  proto = Protocol("hello")
47  adv = Advertise(proto)
48 
49  msg = {"op": "advertise", "topic": 3, "type": "std_msgs/String"}
50  self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
51 
52  msg = {"op": "advertise", "topic": "/jon", "type": 3}
53  self.assertRaises(InvalidArgumentException, adv.advertise, loads(dumps(msg)))
54 
56  invalid = ["", "/", "//", "///", "////", "/////", "bad", "stillbad",
57  "not/better/still", "not//better//still", "not///better///still",
58  "better/", "better//", "better///", "/better", "//better", "///better",
59  "this\isbad", "\\"]
60 
61  proto = Protocol("hello")
62  adv = Advertise(proto)
63 
64  for invalid_type in invalid:
65  msg = {"op": "advertise", "topic": "/test_invalid_msg_typestrings",
66  "type": invalid_type}
67  self.assertRaises(ros_loader.InvalidTypeStringException,
68  adv.advertise, loads(dumps(msg)))
69 
71  nonexistent = ["wangle_msgs/Jam", "whistleblower_msgs/Document",
72  "sexual_harrassment_msgs/UnwantedAdvance", "coercion_msgs/Bribe",
73  "airconditioning_msgs/Cold", "pr2thoughts_msgs/Escape"]
74 
75  proto = Protocol("hello")
76  adv = Advertise(proto)
77 
78  for invalid_type in nonexistent:
79  msg = {"op": "advertise", "topic": "/test_invalid_msg_package",
80  "type": invalid_type}
81  self.assertRaises(ros_loader.InvalidPackageException,
82  adv.advertise, loads(dumps(msg)))
83 
85  no_msgs = ["roslib/Time", "roslib/Duration", "roslib/Header",
86  "std_srvs/ConflictedMsg", "topic_tools/MessageMessage"]
87 
88  proto = Protocol("hello")
89  adv = Advertise(proto)
90 
91  for invalid_type in no_msgs:
92  msg = {"op": "advertise", "topic": "/test_invalid_msg_module",
93  "type": invalid_type}
94  self.assertRaises(ros_loader.InvalidModuleException,
95  adv.advertise, loads(dumps(msg)))
96 
98  nonexistent = ["roscpp/Time", "roscpp/Duration", "roscpp/Header",
99  "rospy/Time", "rospy/Duration", "rospy/Header", "std_msgs/Spool",
100  "geometry_msgs/Tetrahedron", "sensor_msgs/TelepathyUnit"]
101 
102  proto = Protocol("hello")
103  adv = Advertise(proto)
104 
105  for invalid_type in nonexistent:
106  msg = {"op": "advertise", "topic": "/test_invalid_msg_classes",
107  "type": invalid_type}
108  self.assertRaises(ros_loader.InvalidClassException,
109  adv.advertise, loads(dumps(msg)))
110 
112  assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
113  "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
114  "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
115  "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
116  "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
117  "sensor_msgs/PointCloud2"]
118 
119  proto = Protocol("hello")
120  adv = Advertise(proto)
121 
122  for valid_type in assortedmsgs:
123  msg = {"op": "advertise", "topic": "/" + valid_type,
124  "type": valid_type}
125  adv.advertise(loads(dumps(msg)))
126  adv.unadvertise(loads(dumps(msg)))
127 
128  def test_do_advertise(self):
129  proto = Protocol("hello")
130  adv = Advertise(proto)
131  topic = "/test_do_advertise"
132  type = "std_msgs/String"
133 
134  msg = {"op": "advertise", "topic": topic, "type": type}
135  adv.advertise(loads(dumps(msg)))
136  self.assertTrue(self.is_topic_published(topic))
137  adv.unadvertise(loads(dumps(msg)))
138  self.assertTrue(self.is_topic_published(topic))
139  sleep(manager.unregister_timeout * 1.1)
140  self.assertFalse(self.is_topic_published(topic))
141 
142 
143 if __name__ == '__main__':
144  rosunit.unitrun(PKG, NAME, TestAdvertise)
145 
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18