test_publisher_consistency_listener.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 
00007 from time import sleep, time
00008 
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal import ros_loader
00011 from rosbridge_library.internal.message_conversion import *
00012 from std_msgs.msg import String, Int32
00013 
00014 
00015 class TestPublisherConsistencyListener(unittest.TestCase):
00016 
00017     def setUp(self):
00018         rospy.init_node("test_publisher_consistency_listener")
00019 
00020     def test_listener_timeout(self):
00021         """ See whether the listener can correctly time out """
00022         topic = "/test_listener_timeout"
00023         type = String
00024 
00025         publisher = rospy.Publisher(topic, type)
00026 
00027         listener = PublisherConsistencyListener()
00028         listener.attach(publisher)
00029 
00030         self.assertFalse(listener.timed_out())
00031 
00032         sleep(listener.timeout / 2.0)
00033 
00034         self.assertFalse(listener.timed_out())
00035 
00036         sleep(listener.timeout / 2.0 + 0.1)
00037 
00038         self.assertTrue(listener.timed_out())
00039 
00040     def test_listener_attach_detach(self):
00041         """ See whether the listener actually attaches and detaches itself """
00042         topic = "/test_listener_attach_detach"
00043         type = String
00044 
00045         publisher = rospy.Publisher(topic, type)
00046         orig_publish = publisher.publish
00047 
00048         listener = PublisherConsistencyListener()
00049         listener_publish = listener.publish_override
00050 
00051         self.assertNotEqual(orig_publish, listener_publish)
00052         self.assertNotIn(listener, publisher.impl.subscriber_listeners)
00053 
00054         listener.attach(publisher)
00055 
00056         self.assertEqual(publisher.publish, listener_publish)
00057         self.assertNotEqual(publisher.publish, orig_publish)
00058         self.assertIn(listener, publisher.impl.subscriber_listeners)
00059 
00060         listener.detach()
00061 
00062         self.assertEqual(publisher.publish, orig_publish)
00063         self.assertNotEqual(publisher.publish, listener_publish)
00064         self.assertNotIn(listener, publisher.impl.subscriber_listeners)
00065 
00066     def test_immediate_publish_fails_without(self):
00067         """ This test makes sure the failure case that the PublisherConsistency
00068         Listener is trying to solve, is indeed a failure case """
00069         topic = "/test_immediate_publish_fails_without"
00070         msg_class = String
00071 
00072         msg = String()
00073         string = "why halo thar"
00074         msg.data = string
00075 
00076         received = {"msg": None}
00077         def callback(msg):
00078             received["msg"] = msg
00079 
00080         rospy.Subscriber(topic, msg_class, callback)
00081         publisher = rospy.Publisher(topic, msg_class)
00082         publisher.publish(msg)
00083         sleep(0.5)
00084 
00085         self.assertNotEqual(received["msg"], msg)
00086         self.assertEqual(received["msg"], None)
00087 
00088     def test_immediate_publish(self):
00089         """ This test makes sure the PublisherConsistencyListener is working"""
00090         topic = "/test_immediate_publish"
00091         msg_class = String
00092 
00093         msg = String()
00094         string = "why halo thar"
00095         msg.data = string
00096 
00097         received = {"msg": None}
00098         def callback(msg):
00099             print "Received a msg! ", msg
00100             received["msg"] = msg
00101 
00102         rospy.Subscriber(topic, msg_class, callback)
00103 
00104         class temp_listener(rospy.SubscribeListener):
00105             def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00106                 print "peer subscribe in temp listener"
00107 
00108         listener = PublisherConsistencyListener()
00109         publisher = rospy.Publisher(topic, msg_class, temp_listener())
00110         listener.attach(publisher)
00111         publisher.publish(msg)
00112         sleep(0.5)
00113 
00114         self.assertEqual(received["msg"], msg)
00115 
00116     def test_immediate_multi_publish_fails_without(self):
00117         """ This test makes sure the failure case that the PublisherConsistency
00118         Listener is trying to solve, is indeed a failure case, even for large
00119         message buffers """
00120         topic = "/test_immediate_multi_publish_fails_without"
00121         msg_class = Int32
00122 
00123         msgs = []
00124         for i in range(100):
00125             msg = Int32()
00126             msg.data = i
00127             msgs.append(msg)
00128 
00129         received = {"msgs": []}
00130         def callback(msg):
00131             received["msgs"].append(msg)
00132 
00133         rospy.Subscriber(topic, msg_class, callback)
00134 
00135         publisher = rospy.Publisher(topic, msg_class)
00136         for msg in msgs:
00137             publisher.publish(msg)
00138         sleep(0.5)
00139 
00140         self.assertEqual(len(received["msgs"]), 0)
00141         self.assertNotEqual(received["msgs"], msgs)
00142 
00143     def test_immediate_multi_publish(self):
00144         """ This test makes sure the PublisherConsistencyListener is working
00145         even with a huge message buffer"""
00146         topic = "/test_immediate_multi_publish"
00147         msg_class = Int32
00148 
00149         msgs = []
00150         for i in range(100):
00151             msg = Int32()
00152             msg.data = i
00153             msgs.append(msg)
00154 
00155         received = {"msgs": []}
00156         def callback(msg):
00157             received["msgs"].append(msg)
00158 
00159         rospy.Subscriber(topic, msg_class, callback)
00160 
00161         listener = PublisherConsistencyListener()
00162         publisher = rospy.Publisher(topic, msg_class)
00163         listener.attach(publisher)
00164         for msg in msgs:
00165             publisher.publish(msg)
00166         sleep(0.5)
00167 
00168         self.assertEqual(len(received["msgs"]), len(msgs))
00169         self.assertEqual(received["msgs"], msgs)
00170 
00171 
00172 PKG = 'rosbridge_library'
00173 NAME = 'test_publisher_consistency_listener'
00174 if __name__ == '__main__':
00175     rostest.unitrun(PKG, NAME, TestPublisherConsistencyListener)
00176 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Sep 13 2017 03:18:07