test_multi_unregistering.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG); roslib.load_manifest("std_msgs")
00004 import rospy
00005 from rospy import get_published_topics
00006 
00007 from time import sleep, time
00008 
00009 from rosbridge_library.internal.publishers import *
00010 from rosbridge_library.internal.topics import *
00011 from rosbridge_library.internal import ros_loader
00012 from rosbridge_library.internal.message_conversion import *
00013 from std_msgs.msg import String, Int32
00014 
00015 import unittest
00016 
00017 class TestMultiUnregistering(unittest.TestCase):
00018 
00019     def setUp(self):
00020         rospy.init_node("test_multi_unregistering")
00021 
00022     def test_publish_once(self):
00023         """ Make sure that publishing works """
00024         topic = "/test_publish_once"
00025         msg_type = "std_msgs/String"
00026         msg = {"data": "why halo thar"}
00027 
00028         received = {"msg": None}
00029         def cb(msg):
00030             received["msg"] = msg
00031 
00032         rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00033         p = MultiPublisher(topic, msg_type)
00034         p.publish(msg)
00035 
00036         sleep(1)
00037 
00038         self.assertEqual(received["msg"].data, msg["data"])
00039 
00040     def test_publish_twice(self):
00041         """ Make sure that publishing works """
00042         topic = "/test_publish_twice"
00043         msg_type = "std_msgs/String"
00044         msg = {"data": "why halo thar"}
00045 
00046         received = {"msg": None}
00047         def cb(msg):
00048             received["msg"] = msg
00049 
00050         rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
00051         p = MultiPublisher(topic, msg_type)
00052         p.publish(msg)
00053 
00054         sleep(1)
00055 
00056         self.assertEqual(received["msg"].data, msg["data"])
00057         
00058         p.unregister()
00059         sleep(5)
00060         
00061         received["msg"] = None
00062         self.assertIsNone(received["msg"])
00063         p = MultiPublisher(topic, msg_type)
00064         p.publish(msg)
00065 
00066         sleep(1)
00067 
00068         self.assertEqual(received["msg"].data, msg["data"])
00069         
00070         
00071 
00072 if __name__ == '__main__':
00073     import rostest
00074     rostest.rosrun(PKG, 'test_multi_unregistering', TestMultiUnregistering)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04