test_subscribe.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib
00004 roslib.load_manifest(PKG)
00005 roslib.load_manifest("std_msgs")
00006 import rospy
00007 
00008 import unittest
00009 import time
00010 
00011 from json import loads, dumps
00012 from std_msgs.msg import String
00013 
00014 from rosbridge_library.capabilities import subscribe
00015 from rosbridge_library.protocol import Protocol
00016 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00017 
00018 
00019 class TestSubscribe(unittest.TestCase):
00020 
00021     def setUp(self):
00022         rospy.init_node("test_subscribe")
00023 
00024     def dummy_cb(self, msg):
00025         pass
00026 
00027     def test_update_params(self):
00028         """ Adds a bunch of random clients to the subscription and sees whether
00029         the correct parameters are chosen as the min """
00030         client_id = "client_test_update_params"
00031         topic = "/test_update_params"
00032         msg_type = "std_msgs/String"
00033 
00034         subscription = subscribe.Subscription(client_id, topic, None)
00035 
00036         min_throttle_rate = 5
00037         min_queue_length = 2
00038         min_frag_size = 20
00039 
00040         for throttle_rate in range(min_throttle_rate, min_throttle_rate + 10):
00041             for queue_length in range(min_queue_length, min_queue_length + 10):
00042                 for frag_size in range(min_frag_size, min_frag_size + 10):
00043                     sid = throttle_rate * 100 + queue_length * 10 + frag_size
00044                     subscription.subscribe(sid, msg_type, throttle_rate,
00045                                            queue_length, frag_size)
00046 
00047         subscription.update_params()
00048 
00049         try:
00050             self.assertEqual(subscription.throttle_rate, min_throttle_rate)
00051             self.assertEqual(subscription.queue_length, min_queue_length)
00052             self.assertEqual(subscription.fragment_size, min_frag_size)
00053             self.assertEqual(subscription.compression, "none")
00054 
00055             subscription.clients.values()[0]["compression"] = "png"
00056 
00057             subscription.update_params()
00058 
00059             self.assertEqual(subscription.throttle_rate, min_throttle_rate)
00060             self.assertEqual(subscription.queue_length, min_queue_length)
00061             self.assertEqual(subscription.fragment_size, min_frag_size)
00062             self.assertEqual(subscription.compression, "png")
00063         except:
00064             subscription.unregister()
00065             raise
00066 
00067         subscription.unregister()
00068 
00069     def test_missing_arguments(self):
00070         proto = Protocol("test_missing_arguments")
00071         sub = subscribe.Subscribe(proto)
00072         msg = {"op": "subscribe"}
00073         self.assertRaises(MissingArgumentException, sub.subscribe, msg)
00074 
00075     def test_invalid_arguments(self):
00076         proto = Protocol("test_invalid_arguments")
00077         sub = subscribe.Subscribe(proto)
00078 
00079         msg = {"op": "subscribe", "topic": 3}
00080         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00081 
00082         msg = {"op": "subscribe", "topic": "/jon", "type": 3}
00083         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00084 
00085         msg = {"op": "subscribe", "topic": "/jon", "throttle_rate": "fast"}
00086         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00087 
00088         msg = {"op": "subscribe", "topic": "/jon", "fragment_size": "five cubits"}
00089         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00090 
00091         msg = {"op": "subscribe", "topic": "/jon", "queue_length": "long"}
00092         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00093 
00094         msg = {"op": "subscribe", "topic": "/jon", "compression": 9000}
00095         self.assertRaises(InvalidArgumentException, sub.subscribe, msg)
00096 
00097     def test_subscribe_works(self):
00098         proto = Protocol("test_subscribe_works")
00099         sub = subscribe.Subscribe(proto)
00100         topic = "/test_subscribe_works"
00101         msg = String()
00102         msg.data = "test test_subscribe_works works"
00103         msg_type = "std_msgs/String"
00104 
00105         received = {"msg": None}
00106 
00107         def send(outgoing):
00108             received["msg"] = outgoing
00109 
00110         proto.send = send
00111 
00112         sub.subscribe(loads(dumps({"op": "subscribe", "topic": topic, "type": msg_type})))
00113 
00114         p = rospy.Publisher(topic, String)
00115         time.sleep(0.25)
00116         p.publish(msg)
00117 
00118         time.sleep(0.25)
00119         self.assertEqual(received["msg"]["msg"]["data"], msg.data)
00120 
00121 if __name__ == '__main__':
00122     import rostest
00123     rostest.rosrun(PKG, 'test_subscribe', TestSubscribe)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04