Go to the documentation of this file.00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006
00007 from time import sleep, time
00008
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal import ros_loader
00011 from rosbridge_library.internal.message_conversion import *
00012 from std_msgs.msg import String, Int32
00013
00014
00015 class TestPublisherConsistencyListener(unittest.TestCase):
00016
00017 def setUp(self):
00018 rospy.init_node("test_publisher_consistency_listener")
00019
00020 def test_listener_timeout(self):
00021 """ See whether the listener can correctly time out """
00022 topic = "/test_listener_timeout"
00023 type = String
00024
00025 publisher = rospy.Publisher(topic, type)
00026
00027 listener = PublisherConsistencyListener()
00028 listener.attach(publisher)
00029
00030 self.assertFalse(listener.timed_out())
00031
00032 sleep(listener.timeout / 2.0)
00033
00034 self.assertFalse(listener.timed_out())
00035
00036 sleep(listener.timeout / 2.0 + 0.1)
00037
00038 self.assertTrue(listener.timed_out())
00039
00040 def test_listener_attach_detach(self):
00041 """ See whether the listener actually attaches and detaches itself """
00042 topic = "/test_listener_attach_detach"
00043 type = String
00044
00045 publisher = rospy.Publisher(topic, type)
00046 orig_publish = publisher.publish
00047
00048 listener = PublisherConsistencyListener()
00049 listener_publish = listener.publish_override
00050
00051 self.assertNotEqual(orig_publish, listener_publish)
00052 self.assertNotIn(listener, publisher.impl.subscriber_listeners)
00053
00054 listener.attach(publisher)
00055
00056 self.assertEqual(publisher.publish, listener_publish)
00057 self.assertNotEqual(publisher.publish, orig_publish)
00058 self.assertIn(listener, publisher.impl.subscriber_listeners)
00059
00060 listener.detach()
00061
00062 self.assertEqual(publisher.publish, orig_publish)
00063 self.assertNotEqual(publisher.publish, listener_publish)
00064 self.assertNotIn(listener, publisher.impl.subscriber_listeners)
00065
00066 def test_immediate_publish_fails_without(self):
00067 """ This test makes sure the failure case that the PublisherConsistency
00068 Listener is trying to solve, is indeed a failure case """
00069 topic = "/test_immediate_publish_fails_without"
00070 msg_class = String
00071
00072 msg = String()
00073 string = "why halo thar"
00074 msg.data = string
00075
00076 received = {"msg": None}
00077 def callback(msg):
00078 received["msg"] = msg
00079
00080 rospy.Subscriber(topic, msg_class, callback)
00081 publisher = rospy.Publisher(topic, msg_class)
00082 publisher.publish(msg)
00083 sleep(0.5)
00084
00085 self.assertNotEqual(received["msg"], msg)
00086 self.assertEqual(received["msg"], None)
00087
00088 def test_immediate_publish(self):
00089 """ This test makes sure the PublisherConsistencyListener is working"""
00090 topic = "/test_immediate_publish"
00091 msg_class = String
00092
00093 msg = String()
00094 string = "why halo thar"
00095 msg.data = string
00096
00097 received = {"msg": None}
00098 def callback(msg):
00099 print "Received a msg! ", msg
00100 received["msg"] = msg
00101
00102 rospy.Subscriber(topic, msg_class, callback)
00103
00104 class temp_listener(rospy.SubscribeListener):
00105 def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00106 print "peer subscribe in temp listener"
00107
00108 listener = PublisherConsistencyListener()
00109 publisher = rospy.Publisher(topic, msg_class, temp_listener())
00110 listener.attach(publisher)
00111 publisher.publish(msg)
00112 sleep(0.5)
00113
00114 self.assertEqual(received["msg"], msg)
00115
00116 def test_immediate_multi_publish_fails_without(self):
00117 """ This test makes sure the failure case that the PublisherConsistency
00118 Listener is trying to solve, is indeed a failure case, even for large
00119 message buffers """
00120 topic = "/test_immediate_multi_publish_fails_without"
00121 msg_class = Int32
00122
00123 msgs = []
00124 for i in range(100):
00125 msg = Int32()
00126 msg.data = i
00127 msgs.append(msg)
00128
00129 received = {"msgs": []}
00130 def callback(msg):
00131 received["msgs"].append(msg)
00132
00133 rospy.Subscriber(topic, msg_class, callback)
00134
00135 publisher = rospy.Publisher(topic, msg_class)
00136 for msg in msgs:
00137 publisher.publish(msg)
00138 sleep(0.5)
00139
00140 self.assertEqual(len(received["msgs"]), 0)
00141 self.assertNotEqual(received["msgs"], msgs)
00142
00143 def test_immediate_multi_publish(self):
00144 """ This test makes sure the PublisherConsistencyListener is working
00145 even with a huge message buffer"""
00146 topic = "/test_immediate_multi_publish"
00147 msg_class = Int32
00148
00149 msgs = []
00150 for i in range(100):
00151 msg = Int32()
00152 msg.data = i
00153 msgs.append(msg)
00154
00155 received = {"msgs": []}
00156 def callback(msg):
00157 received["msgs"].append(msg)
00158
00159 rospy.Subscriber(topic, msg_class, callback)
00160
00161 listener = PublisherConsistencyListener()
00162 publisher = rospy.Publisher(topic, msg_class)
00163 listener.attach(publisher)
00164 for msg in msgs:
00165 publisher.publish(msg)
00166 sleep(0.5)
00167
00168 self.assertEqual(len(received["msgs"]), len(msgs))
00169 self.assertEqual(received["msgs"], msgs)
00170
00171
00172 PKG = 'rosbridge_library'
00173 NAME = 'test_publisher_consistency_listener'
00174 if __name__ == '__main__':
00175 rostest.unitrun(PKG, NAME, TestPublisherConsistencyListener)
00176