Go to the documentation of this file.00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006
00007 from time import sleep, time
00008
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014
00015
00016 class TestMultiUnregistering(unittest.TestCase):
00017
00018 def setUp(self):
00019 rospy.init_node("test_multi_unregistering")
00020
00021 def test_publish_once(self):
00022 """ Make sure that publishing works """
00023 topic = "/test_publish_once"
00024 msg_type = "std_msgs/String"
00025 msg = {"data": "why halo thar"}
00026
00027 received = {"msg": None}
00028 def cb(msg):
00029 received["msg"] = msg
00030
00031 rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00032 p = MultiPublisher(topic, msg_type)
00033 p.publish(msg)
00034
00035 sleep(1)
00036
00037 self.assertEqual(received["msg"].data, msg["data"])
00038
00039 def test_publish_twice(self):
00040 """ Make sure that publishing works """
00041 topic = "/test_publish_twice"
00042 msg_type = "std_msgs/String"
00043 msg = {"data": "why halo thar"}
00044
00045 received = {"msg": None}
00046 def cb(msg):
00047 received["msg"] = msg
00048
00049 rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00050 p = MultiPublisher(topic, msg_type)
00051 p.publish(msg)
00052
00053 sleep(1)
00054
00055 self.assertEqual(received["msg"].data, msg["data"])
00056
00057 p.unregister()
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067 sleep(5)
00068
00069 received["msg"] = None
00070 self.assertIsNone(received["msg"])
00071 p = MultiPublisher(topic, msg_type)
00072 p.publish(msg)
00073
00074 self.assertIsNone(received["msg"])
00075
00076 sleep(3)
00077 p.publish(msg)
00078 sleep(2)
00079
00080
00081 self.assertIsNone(received["msg"])
00082 sleep(3)
00083 self.assertEqual(received["msg"].data, msg["data"])
00084
00085
00086 PKG = 'rosbridge_library'
00087 NAME = 'test_multi_unregistering'
00088 if __name__ == '__main__':
00089 rostest.unitrun(PKG, NAME, TestMultiUnregistering)