test_subscribe.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 import time
00007 
00008 from json import loads, dumps
00009 from std_msgs.msg import String
00010 
00011 from rosbridge_library.capabilities import subscribe
00012 from rosbridge_library.protocol import Protocol
00013 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00014 
00015 
00016 class TestSubscribe(unittest.TestCase):
00017 
00018     def setUp(self):
00019         rospy.init_node("test_subscribe")
00020 
00021     def dummy_cb(self, msg):
00022         pass
00023 
00024     def test_update_params(self):
00025         """ Adds a bunch of random clients to the subscription and sees whether
00026         the correct parameters are chosen as the min """
00027         client_id = "client_test_update_params"
00028         topic = "/test_update_params"
00029         msg_type = "std_msgs/String"
00030 
00031         subscription = subscribe.Subscription(client_id, topic, None)
00032 
00033         min_throttle_rate = 5
00034         min_queue_length = 2
00035         min_frag_size = 20
00036 
00037         for throttle_rate in range(min_throttle_rate, min_throttle_rate + 10):
00038             for queue_length in range(min_queue_length, min_queue_length + 10):
00039                 for frag_size in range(min_frag_size, min_frag_size + 10):
00040                     sid = throttle_rate * 100 + queue_length * 10 + frag_size
00041                     subscription.subscribe(sid, msg_type, throttle_rate,
00042                                            queue_length, frag_size)
00043 
00044         subscription.update_params()
00045 
00046         try:
00047             self.assertEqual(subscription.throttle_rate, min_throttle_rate)
00048             self.assertEqual(subscription.queue_length, min_queue_length)
00049             self.assertEqual(subscription.fragment_size, min_frag_size)
00050             self.assertEqual(subscription.compression, "none")
00051 
00052             list(subscription.clients.values())[0]["compression"] = "png"
00053 
00054             subscription.update_params()
00055 
00056             self.assertEqual(subscription.throttle_rate, min_throttle_rate)
00057             self.assertEqual(subscription.queue_length, min_queue_length)
00058             self.assertEqual(subscription.fragment_size, min_frag_size)
00059             self.assertEqual(subscription.compression, "png")
00060         except:
00061             subscription.unregister()
00062             raise
00063 
00064         subscription.unregister()
00065 
00066     def test_missing_arguments(self):
00067         proto = Protocol("test_missing_arguments")
00068         sub = subscribe.Subscribe(proto)
00069         msg = {"op": "subscribe"}
00070         self.assertRaises(MissingArgumentException, sub.subscribe, msg)
00071 
00072     def test_invalid_arguments(self):
00073         proto = Protocol("test_invalid_arguments")
00074         sub = subscribe.Subscribe(proto)
00075 
00076         msg = {"op": "subscribe", "topic": 3}
00077         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00078 
00079         msg = {"op": "subscribe", "topic": "/jon", "type": 3}
00080         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00081 
00082         msg = {"op": "subscribe", "topic": "/jon", "throttle_rate": "fast"}
00083         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00084 
00085         msg = {"op": "subscribe", "topic": "/jon", "fragment_size": "five cubits"}
00086         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00087 
00088         msg = {"op": "subscribe", "topic": "/jon", "queue_length": "long"}
00089         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00090 
00091         msg = {"op": "subscribe", "topic": "/jon", "compression": 9000}
00092         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00093 
00094     def test_subscribe_works(self):
00095         proto = Protocol("test_subscribe_works")
00096         sub = subscribe.Subscribe(proto)
00097         topic = "/test_subscribe_works"
00098         msg = String()
00099         msg.data = "test test_subscribe_works works"
00100         msg_type = "std_msgs/String"
00101 
00102         received = {"msg": None}
00103 
00104         def send(outgoing):
00105             received["msg"] = outgoing
00106 
00107         proto.send = send
00108 
00109         sub.subscribe(loads(dumps({"op": "subscribe", "topic": topic, "type": msg_type})))
00110 
00111         p = rospy.Publisher(topic, String, queue_size=5)
00112         time.sleep(0.25)
00113         p.publish(msg)
00114 
00115         time.sleep(0.25)
00116         self.assertEqual(received["msg"]["msg"]["data"], msg.data)
00117 
00118 
00119 PKG = 'rosbridge_library'
00120 NAME = 'test_subscribe'
00121 if __name__ == '__main__':
00122     rostest.unitrun(PKG, NAME, TestSubscribe)
00123 


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43