test_multi_unregistering.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 
00007 from time import sleep, time
00008 
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014 
00015 
00016 class TestMultiUnregistering(unittest.TestCase):
00017 
00018     def setUp(self):
00019         rospy.init_node("test_multi_unregistering")
00020 
00021     def test_publish_once(self):
00022         """ Make sure that publishing works """
00023         topic = "/test_publish_once"
00024         msg_type = "std_msgs/String"
00025         msg = {"data": "why halo thar"}
00026 
00027         received = {"msg": None}
00028         def cb(msg):
00029             received["msg"] = msg
00030 
00031         rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00032         p = MultiPublisher(topic, msg_type)
00033         p.publish(msg)
00034 
00035         sleep(1)
00036 
00037         self.assertEqual(received["msg"].data, msg["data"])
00038 
00039     def test_publish_twice(self):
00040         """ Make sure that publishing works """
00041         topic = "/test_publish_twice"
00042         msg_type = "std_msgs/String"
00043         msg = {"data": "why halo thar"}
00044 
00045         received = {"msg": None}
00046         def cb(msg):
00047             received["msg"] = msg
00048 
00049         rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00050         p = MultiPublisher(topic, msg_type)
00051         p.publish(msg)
00052 
00053         sleep(1)
00054 
00055         self.assertEqual(received["msg"].data, msg["data"])
00056 
00057         p.unregister()
00058         # The publisher went away at time T. Here's the timeline of the events:
00059         # T+1 seconds - the subscriber will retry to reconnect - fail
00060         # T+3 seconds - the subscriber will retry to reconnect - fail
00061         # T+5 seconds - publish msg -> it's gone
00062         # T+7 seconds - the subscriber will retry to reconnect - success
00063         # T+8 seconds - publish msg -> OK
00064         # T+11 seconds - we receive the message. Looks like a bug in reconnection...
00065         #                https://github.com/ros/ros_comm/blob/indigo-devel/clients/rospy/src/rospy/impl/tcpros_base.py#L733
00066         #                That line should probably be indented.
00067         sleep(5)
00068 
00069         received["msg"] = None
00070         self.assertIsNone(received["msg"])
00071         p = MultiPublisher(topic, msg_type)
00072         p.publish(msg)
00073 
00074         self.assertIsNone(received["msg"])
00075 
00076         sleep(3)
00077         p.publish(msg)
00078         sleep(2)
00079         # Next two lines should be removed when this is fixed:
00080         # https://github.com/ros/ros_comm/blob/indigo-devel/clients/rospy/src/rospy/impl/tcpros_base.py#L733
00081         self.assertIsNone(received["msg"])
00082         sleep(3)
00083         self.assertEqual(received["msg"].data, msg["data"])
00084 
00085 
00086 PKG = 'rosbridge_library'
00087 NAME = 'test_multi_unregistering'
00088 if __name__ == '__main__':
00089     rostest.unitrun(PKG, NAME, TestMultiUnregistering)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43