Go to the documentation of this file.00001
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG); roslib.load_manifest("std_msgs")
00004 import rospy
00005 from rospy import get_published_topics
00006
00007 from time import sleep, time
00008
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014
00015 import unittest
00016
00017 class TestMultiUnregistering(unittest.TestCase):
00018
00019 def setUp(self):
00020 rospy.init_node("test_multi_unregistering")
00021
00022 def test_publish_once(self):
00023 """ Make sure that publishing works """
00024 topic = "/test_publish_once"
00025 msg_type = "std_msgs/String"
00026 msg = {"data": "why halo thar"}
00027
00028 received = {"msg": None}
00029 def cb(msg):
00030 received["msg"] = msg
00031
00032 rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00033 p = MultiPublisher(topic, msg_type)
00034 p.publish(msg)
00035
00036 sleep(1)
00037
00038 self.assertEqual(received["msg"].data, msg["data"])
00039
00040 def test_publish_twice(self):
00041 """ Make sure that publishing works """
00042 topic = "/test_publish_twice"
00043 msg_type = "std_msgs/String"
00044 msg = {"data": "why halo thar"}
00045
00046 received = {"msg": None}
00047 def cb(msg):
00048 received["msg"] = msg
00049
00050 rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00051 p = MultiPublisher(topic, msg_type)
00052 p.publish(msg)
00053
00054 sleep(1)
00055
00056 self.assertEqual(received["msg"].data, msg["data"])
00057
00058 p.unregister()
00059 sleep(5)
00060
00061 received["msg"] = None
00062 self.assertIsNone(received["msg"])
00063 p = MultiPublisher(topic, msg_type)
00064 p.publish(msg)
00065
00066 sleep(1)
00067
00068 self.assertEqual(received["msg"].data, msg["data"])
00069
00070
00071
00072 if __name__ == '__main__':
00073 import rostest
00074 rostest.rosrun(PKG, 'test_multi_unregistering', TestMultiUnregistering)