Go to the documentation of this file.00001
00002 from __future__ import print_function
00003 import sys
00004 import rospy
00005 import rostest
00006 import unittest
00007
00008 from time import sleep, time
00009
00010 from rosbridge_library.internal.publishers import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014
00015
00016 class TestPublisherConsistencyListener(unittest.TestCase):
00017
00018 def setUp(self):
00019 rospy.init_node("test_publisher_consistency_listener")
00020
00021 def test_listener_timeout(self):
00022 """ See whether the listener can correctly time out """
00023 topic = "/test_listener_timeout"
00024 type = String
00025
00026 publisher = rospy.Publisher(topic, type)
00027
00028 listener = PublisherConsistencyListener()
00029 listener.attach(publisher)
00030
00031 self.assertFalse(listener.timed_out())
00032
00033 sleep(listener.timeout / 2.0)
00034
00035 self.assertFalse(listener.timed_out())
00036
00037 sleep(listener.timeout / 2.0 + 0.1)
00038
00039 self.assertTrue(listener.timed_out())
00040
00041 def test_listener_attach_detach(self):
00042 """ See whether the listener actually attaches and detaches itself """
00043 topic = "/test_listener_attach_detach"
00044 type = String
00045
00046 publisher = rospy.Publisher(topic, type)
00047 orig_publish = publisher.publish
00048
00049 listener = PublisherConsistencyListener()
00050 listener_publish = listener.publish_override
00051
00052 self.assertNotEqual(orig_publish, listener_publish)
00053 self.assertNotIn(listener, publisher.impl.subscriber_listeners)
00054
00055 listener.attach(publisher)
00056
00057 self.assertEqual(publisher.publish, listener_publish)
00058 self.assertNotEqual(publisher.publish, orig_publish)
00059 self.assertIn(listener, publisher.impl.subscriber_listeners)
00060
00061 listener.detach()
00062
00063 self.assertEqual(publisher.publish, orig_publish)
00064 self.assertNotEqual(publisher.publish, listener_publish)
00065 self.assertNotIn(listener, publisher.impl.subscriber_listeners)
00066
00067 def test_immediate_publish_fails_without(self):
00068 """ This test makes sure the failure case that the PublisherConsistency
00069 Listener is trying to solve, is indeed a failure case """
00070 topic = "/test_immediate_publish_fails_without"
00071 msg_class = String
00072
00073 msg = String()
00074 string = "why halo thar"
00075 msg.data = string
00076
00077 received = {"msg": None}
00078 def callback(msg):
00079 received["msg"] = msg
00080
00081 rospy.Subscriber(topic, msg_class, callback)
00082 publisher = rospy.Publisher(topic, msg_class)
00083 publisher.publish(msg)
00084 sleep(0.5)
00085
00086 self.assertNotEqual(received["msg"], msg)
00087 self.assertEqual(received["msg"], None)
00088
00089 def test_immediate_publish(self):
00090 """ This test makes sure the PublisherConsistencyListener is working"""
00091 topic = "/test_immediate_publish"
00092 msg_class = String
00093
00094 msg = String()
00095 string = "why halo thar"
00096 msg.data = string
00097
00098 received = {"msg": None}
00099 def callback(msg):
00100 print("Received a msg! ", msg)
00101 received["msg"] = msg
00102
00103 rospy.Subscriber(topic, msg_class, callback)
00104
00105 class temp_listener(rospy.SubscribeListener):
00106 def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00107 print("peer subscribe in temp listener")
00108
00109 listener = PublisherConsistencyListener()
00110 publisher = rospy.Publisher(topic, msg_class, temp_listener())
00111 listener.attach(publisher)
00112 publisher.publish(msg)
00113 sleep(0.5)
00114
00115 self.assertEqual(received["msg"], msg)
00116
00117 def test_immediate_multi_publish_fails_without(self):
00118 """ This test makes sure the failure case that the PublisherConsistency
00119 Listener is trying to solve, is indeed a failure case, even for large
00120 message buffers """
00121 topic = "/test_immediate_multi_publish_fails_without"
00122 msg_class = Int32
00123
00124 msgs = []
00125 for i in range(100):
00126 msg = Int32()
00127 msg.data = i
00128 msgs.append(msg)
00129
00130 received = {"msgs": []}
00131 def callback(msg):
00132 received["msgs"].append(msg)
00133
00134 rospy.Subscriber(topic, msg_class, callback)
00135
00136 publisher = rospy.Publisher(topic, msg_class)
00137 for msg in msgs:
00138 publisher.publish(msg)
00139 sleep(0.5)
00140
00141 self.assertEqual(len(received["msgs"]), 0)
00142 self.assertNotEqual(received["msgs"], msgs)
00143
00144 def test_immediate_multi_publish(self):
00145 """ This test makes sure the PublisherConsistencyListener is working
00146 even with a huge message buffer"""
00147 topic = "/test_immediate_multi_publish"
00148 msg_class = Int32
00149
00150 msgs = []
00151 for i in range(100):
00152 msg = Int32()
00153 msg.data = i
00154 msgs.append(msg)
00155
00156 received = {"msgs": []}
00157 def callback(msg):
00158 received["msgs"].append(msg)
00159
00160 rospy.Subscriber(topic, msg_class, callback)
00161
00162 listener = PublisherConsistencyListener()
00163 publisher = rospy.Publisher(topic, msg_class)
00164 listener.attach(publisher)
00165 for msg in msgs:
00166 publisher.publish(msg)
00167 sleep(0.5)
00168
00169 self.assertEqual(len(received["msgs"]), len(msgs))
00170 self.assertEqual(received["msgs"], msgs)
00171
00172
00173 PKG = 'rosbridge_library'
00174 NAME = 'test_publisher_consistency_listener'
00175 if __name__ == '__main__':
00176 rostest.unitrun(PKG, NAME, TestPublisherConsistencyListener)
00177