test_publisher_consistency_listener.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys
4 import rospy
5 import rosunit
6 import unittest
7 
8 from time import sleep, time
9 
11 from rosbridge_library.internal import ros_loader
13 from std_msgs.msg import String, Int32
14 
15 
16 PKG = 'rosbridge_library'
17 NAME = 'test_publisher_consistency_listener'
18 
19 
20 class TestPublisherConsistencyListener(unittest.TestCase):
21 
22  def setUp(self):
23  rospy.init_node(NAME)
24 
26  """ See whether the listener can correctly time out """
27  topic = "/test_listener_timeout"
28  type = String
29 
30  publisher = rospy.Publisher(topic, type)
31 
32  listener = PublisherConsistencyListener()
33  listener.attach(publisher)
34 
35  self.assertFalse(listener.timed_out())
36 
37  sleep(listener.timeout / 2.0)
38 
39  self.assertFalse(listener.timed_out())
40 
41  sleep(listener.timeout / 2.0 + 0.1)
42 
43  self.assertTrue(listener.timed_out())
44 
46  """ See whether the listener actually attaches and detaches itself """
47  topic = "/test_listener_attach_detach"
48  type = String
49 
50  publisher = rospy.Publisher(topic, type)
51  orig_publish = publisher.publish
52 
53  listener = PublisherConsistencyListener()
54  listener_publish = listener.publish_override
55 
56  self.assertNotEqual(orig_publish, listener_publish)
57  self.assertNotIn(listener, publisher.impl.subscriber_listeners)
58 
59  listener.attach(publisher)
60 
61  self.assertEqual(publisher.publish, listener_publish)
62  self.assertNotEqual(publisher.publish, orig_publish)
63  self.assertIn(listener, publisher.impl.subscriber_listeners)
64 
65  listener.detach()
66 
67  self.assertEqual(publisher.publish, orig_publish)
68  self.assertNotEqual(publisher.publish, listener_publish)
69  self.assertNotIn(listener, publisher.impl.subscriber_listeners)
70 
72  """ This test makes sure the failure case that the PublisherConsistency
73  Listener is trying to solve, is indeed a failure case """
74  topic = "/test_immediate_publish_fails_without"
75  msg_class = String
76 
77  msg = String()
78  string = "why halo thar"
79  msg.data = string
80 
81  received = {"msg": None}
82  def callback(msg):
83  received["msg"] = msg
84 
85  rospy.Subscriber(topic, msg_class, callback)
86  publisher = rospy.Publisher(topic, msg_class)
87  publisher.publish(msg)
88  sleep(0.5)
89 
90  self.assertNotEqual(received["msg"], msg)
91  self.assertEqual(received["msg"], None)
92 
94  """ This test makes sure the PublisherConsistencyListener is working"""
95  topic = "/test_immediate_publish"
96  msg_class = String
97 
98  msg = String()
99  string = "why halo thar"
100  msg.data = string
101 
102  received = {"msg": None}
103  def callback(msg):
104  print("Received a msg! ", msg)
105  received["msg"] = msg
106 
107  rospy.Subscriber(topic, msg_class, callback)
108 
109  class temp_listener(rospy.SubscribeListener):
110  def peer_subscribe(self, topic_name, topic_publish, peer_publish):
111  print("peer subscribe in temp listener")
112 
113  listener = PublisherConsistencyListener()
114  publisher = rospy.Publisher(topic, msg_class, temp_listener())
115  listener.attach(publisher)
116  publisher.publish(msg)
117  sleep(0.5)
118 
119  self.assertEqual(received["msg"], msg)
120 
122  """ This test makes sure the failure case that the PublisherConsistency
123  Listener is trying to solve, is indeed a failure case, even for large
124  message buffers """
125  topic = "/test_immediate_multi_publish_fails_without"
126  msg_class = Int32
127 
128  msgs = []
129  for i in range(100):
130  msg = Int32()
131  msg.data = i
132  msgs.append(msg)
133 
134  received = {"msgs": []}
135  def callback(msg):
136  received["msgs"].append(msg)
137 
138  rospy.Subscriber(topic, msg_class, callback)
139 
140  publisher = rospy.Publisher(topic, msg_class)
141  for msg in msgs:
142  publisher.publish(msg)
143  sleep(0.5)
144 
145  self.assertEqual(len(received["msgs"]), 0)
146  self.assertNotEqual(received["msgs"], msgs)
147 
149  """ This test makes sure the PublisherConsistencyListener is working
150  even with a huge message buffer"""
151  topic = "/test_immediate_multi_publish"
152  msg_class = Int32
153 
154  msgs = []
155  for i in range(100):
156  msg = Int32()
157  msg.data = i
158  msgs.append(msg)
159 
160  received = {"msgs": []}
161  def callback(msg):
162  received["msgs"].append(msg)
163 
164  rospy.Subscriber(topic, msg_class, callback)
165 
166  listener = PublisherConsistencyListener()
167  publisher = rospy.Publisher(topic, msg_class)
168  listener.attach(publisher)
169  for msg in msgs:
170  publisher.publish(msg)
171  sleep(0.5)
172 
173  self.assertEqual(len(received["msgs"]), len(msgs))
174  self.assertEqual(received["msgs"], msgs)
175 
176 
177 if __name__ == '__main__':
178  rosunit.unitrun(PKG, NAME, TestPublisherConsistencyListener)
179 
msg
rosbridge_library.internal.publishers
Definition: publishers.py:1
test.internal.publishers.test_publisher_consistency_listener.TestPublisherConsistencyListener.test_immediate_multi_publish
def test_immediate_multi_publish(self)
Definition: test_publisher_consistency_listener.py:148
test.internal.publishers.test_publisher_consistency_listener.TestPublisherConsistencyListener.setUp
def setUp(self)
Definition: test_publisher_consistency_listener.py:22
rosbridge_library.internal.publishers.PublisherConsistencyListener
Definition: publishers.py:44
rosbridge_library.internal.message_conversion
Definition: message_conversion.py:1
test.internal.publishers.test_publisher_consistency_listener.TestPublisherConsistencyListener.test_immediate_publish
def test_immediate_publish(self)
Definition: test_publisher_consistency_listener.py:93
test.internal.publishers.test_publisher_consistency_listener.TestPublisherConsistencyListener.test_immediate_publish_fails_without
def test_immediate_publish_fails_without(self)
Definition: test_publisher_consistency_listener.py:71
test.internal.publishers.test_publisher_consistency_listener.TestPublisherConsistencyListener.test_listener_timeout
def test_listener_timeout(self)
Definition: test_publisher_consistency_listener.py:25
test.internal.publishers.test_publisher_consistency_listener.TestPublisherConsistencyListener.test_immediate_multi_publish_fails_without
def test_immediate_multi_publish_fails_without(self)
Definition: test_publisher_consistency_listener.py:121
test.internal.publishers.test_publisher_consistency_listener.TestPublisherConsistencyListener.test_listener_attach_detach
def test_listener_attach_detach(self)
Definition: test_publisher_consistency_listener.py:45
rosbridge_library.internal
Definition: src/rosbridge_library/internal/__init__.py:1
test.internal.publishers.test_publisher_consistency_listener.TestPublisherConsistencyListener
Definition: test_publisher_consistency_listener.py:20


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Tue Oct 3 2023 02:12:45