test_publisher_consistency_listener.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 from __future__ import print_function
00003 import sys
00004 import rospy
00005 import rostest
00006 import unittest
00007 
00008 from time import sleep, time
00009 
00010 from rosbridge_library.internal.publishers import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014 
00015 
00016 class TestPublisherConsistencyListener(unittest.TestCase):
00017 
00018     def setUp(self):
00019         rospy.init_node("test_publisher_consistency_listener")
00020 
00021     def test_listener_timeout(self):
00022         """ See whether the listener can correctly time out """
00023         topic = "/test_listener_timeout"
00024         type = String
00025 
00026         publisher = rospy.Publisher(topic, type)
00027 
00028         listener = PublisherConsistencyListener()
00029         listener.attach(publisher)
00030 
00031         self.assertFalse(listener.timed_out())
00032 
00033         sleep(listener.timeout / 2.0)
00034 
00035         self.assertFalse(listener.timed_out())
00036 
00037         sleep(listener.timeout / 2.0 + 0.1)
00038 
00039         self.assertTrue(listener.timed_out())
00040 
00041     def test_listener_attach_detach(self):
00042         """ See whether the listener actually attaches and detaches itself """
00043         topic = "/test_listener_attach_detach"
00044         type = String
00045 
00046         publisher = rospy.Publisher(topic, type)
00047         orig_publish = publisher.publish
00048 
00049         listener = PublisherConsistencyListener()
00050         listener_publish = listener.publish_override
00051 
00052         self.assertNotEqual(orig_publish, listener_publish)
00053         self.assertNotIn(listener, publisher.impl.subscriber_listeners)
00054 
00055         listener.attach(publisher)
00056 
00057         self.assertEqual(publisher.publish, listener_publish)
00058         self.assertNotEqual(publisher.publish, orig_publish)
00059         self.assertIn(listener, publisher.impl.subscriber_listeners)
00060 
00061         listener.detach()
00062 
00063         self.assertEqual(publisher.publish, orig_publish)
00064         self.assertNotEqual(publisher.publish, listener_publish)
00065         self.assertNotIn(listener, publisher.impl.subscriber_listeners)
00066 
00067     def test_immediate_publish_fails_without(self):
00068         """ This test makes sure the failure case that the PublisherConsistency
00069         Listener is trying to solve, is indeed a failure case """
00070         topic = "/test_immediate_publish_fails_without"
00071         msg_class = String
00072 
00073         msg = String()
00074         string = "why halo thar"
00075         msg.data = string
00076 
00077         received = {"msg": None}
00078         def callback(msg):
00079             received["msg"] = msg
00080 
00081         rospy.Subscriber(topic, msg_class, callback)
00082         publisher = rospy.Publisher(topic, msg_class)
00083         publisher.publish(msg)
00084         sleep(0.5)
00085 
00086         self.assertNotEqual(received["msg"], msg)
00087         self.assertEqual(received["msg"], None)
00088 
00089     def test_immediate_publish(self):
00090         """ This test makes sure the PublisherConsistencyListener is working"""
00091         topic = "/test_immediate_publish"
00092         msg_class = String
00093 
00094         msg = String()
00095         string = "why halo thar"
00096         msg.data = string
00097 
00098         received = {"msg": None}
00099         def callback(msg):
00100             print("Received a msg! ", msg)
00101             received["msg"] = msg
00102 
00103         rospy.Subscriber(topic, msg_class, callback)
00104 
00105         class temp_listener(rospy.SubscribeListener):
00106             def peer_subscribe(self, topic_name, topic_publish, peer_publish):
00107                 print("peer subscribe in temp listener")
00108 
00109         listener = PublisherConsistencyListener()
00110         publisher = rospy.Publisher(topic, msg_class, temp_listener())
00111         listener.attach(publisher)
00112         publisher.publish(msg)
00113         sleep(0.5)
00114 
00115         self.assertEqual(received["msg"], msg)
00116 
00117     def test_immediate_multi_publish_fails_without(self):
00118         """ This test makes sure the failure case that the PublisherConsistency
00119         Listener is trying to solve, is indeed a failure case, even for large
00120         message buffers """
00121         topic = "/test_immediate_multi_publish_fails_without"
00122         msg_class = Int32
00123 
00124         msgs = []
00125         for i in range(100):
00126             msg = Int32()
00127             msg.data = i
00128             msgs.append(msg)
00129 
00130         received = {"msgs": []}
00131         def callback(msg):
00132             received["msgs"].append(msg)
00133 
00134         rospy.Subscriber(topic, msg_class, callback)
00135 
00136         publisher = rospy.Publisher(topic, msg_class)
00137         for msg in msgs:
00138             publisher.publish(msg)
00139         sleep(0.5)
00140 
00141         self.assertEqual(len(received["msgs"]), 0)
00142         self.assertNotEqual(received["msgs"], msgs)
00143 
00144     def test_immediate_multi_publish(self):
00145         """ This test makes sure the PublisherConsistencyListener is working
00146         even with a huge message buffer"""
00147         topic = "/test_immediate_multi_publish"
00148         msg_class = Int32
00149 
00150         msgs = []
00151         for i in range(100):
00152             msg = Int32()
00153             msg.data = i
00154             msgs.append(msg)
00155 
00156         received = {"msgs": []}
00157         def callback(msg):
00158             received["msgs"].append(msg)
00159 
00160         rospy.Subscriber(topic, msg_class, callback)
00161 
00162         listener = PublisherConsistencyListener()
00163         publisher = rospy.Publisher(topic, msg_class)
00164         listener.attach(publisher)
00165         for msg in msgs:
00166             publisher.publish(msg)
00167         sleep(0.5)
00168 
00169         self.assertEqual(len(received["msgs"]), len(msgs))
00170         self.assertEqual(received["msgs"], msgs)
00171 
00172 
00173 PKG = 'rosbridge_library'
00174 NAME = 'test_publisher_consistency_listener'
00175 if __name__ == '__main__':
00176     rostest.unitrun(PKG, NAME, TestPublisherConsistencyListener)
00177 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43