00001
00002
00003 import sys
00004 import time
00005 import subprocess
00006 import unittest
00007
00008 import roslib; roslib.load_manifest('network_monitor_udp')
00009 import rospy
00010 import rostest
00011
00012 from network_monitor_udp.linktest import UdpmonsourceHandle
00013 from network_monitor_udp.linktest import LinkTest
00014 from network_monitor_udp.msg import LinktestGoal
00015
00016 from tc_port_control import TcPortControl
00017
00018 class AdaptiveBandwidthTest(unittest.TestCase):
00019 def __init__(self, *args):
00020 super(AdaptiveBandwidthTest, self).__init__(*args)
00021 rospy.init_node('network_monitor_udp_test')
00022 self.srcnode = UdpmonsourceHandle('performance_test')
00023 self.tc = TcPortControl(self)
00024 self.tc.reset()
00025
00026 def setUp(self):
00027 self.srcnode.cancel_all_tests()
00028 self.tc.init()
00029
00030 def tearDown(self):
00031 self.tc.reset()
00032
00033 def test_ramp_up(self):
00034 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 5.0,
00035 sink_ip = "127.0.0.1", sink_port = 12345,
00036 bw_type = LinktestGoal.BW_ADAPTIVE, update_interval = 0.2,
00037 latency_threshold = 0.1)
00038 self.tc.set_rate_limit(5e6)
00039 test.start()
00040 time.sleep(5.5)
00041 self.assertTrue(test.bandwidth.movavg(5) > 4.3e6 and test.bandwidth.movavg(5) < 5.7e6,
00042 "Expected capacity on loopback interface to be ~5Mbit/s, instead it was %.2fMbit/s"%
00043 (test.bandwidth.movavg(5)/1e6))
00044
00045 def test_ramp_down(self):
00046 test = self.srcnode.create_test(bw = 5.0*10**6, pktsize = 1500, duration = 5.0,
00047 sink_ip = "127.0.0.1", sink_port = 12345,
00048 bw_type = LinktestGoal.BW_ADAPTIVE, update_interval = 0.2,
00049 latency_threshold = 0.1)
00050 self.tc.set_rate_limit(5e6)
00051 test.start()
00052 time.sleep(2.0)
00053 self.tc.set_rate_limit(1e6)
00054 time.sleep(3.0)
00055 self.assertTrue(test.bandwidth.movavg(5) > 0.6e6 and test.bandwidth.movavg(5) < 1.4e6,
00056 "Expected capacity on loopback interface to be ~1Mbit/s, instead it was %.2fMbit/s"%
00057 (test.bandwidth.movavg(5)/1e6))
00058
00059 def test_varying_capacity(self):
00060 test = self.srcnode.create_test(bw = 2*10**6, pktsize = 1500, duration = 15.0,
00061 sink_ip = "127.0.0.1", sink_port = 12345,
00062 bw_type = LinktestGoal.BW_ADAPTIVE, update_interval = 0.2,
00063 latency_threshold = 0.1)
00064 self.tc.set_rate_limit(10e6)
00065 test.start()
00066 time.sleep(4.0)
00067 self.assertTrue(test.bandwidth.movavg(5) > 9e6 and test.bandwidth.movavg(5) < 11e6,
00068 "Expected capacity on loopback interface to be ~10Mbit/s, instead it was %.2fMbit/s"%
00069 (test.bandwidth.movavg(5)/1e6))
00070 self.tc.set_rate_limit(1e6)
00071 time.sleep(4.0)
00072 self.assertTrue(test.bandwidth.movavg(5) > 0.6e6 and test.bandwidth.movavg(5) < 1.4e6,
00073 "Expected capacity on loopback interface to be ~1Mbit/s, instead it was %.2fMbit/s"%
00074 (test.bandwidth.movavg(5)/1e6))
00075 self.tc.set_rate_limit(5e6)
00076 time.sleep(6.0)
00077 self.assertTrue(test.bandwidth.movavg(5) > 3.5e6 and test.bandwidth.movavg(5) < 5.7e6,
00078 "Expected capacity on loopback interface to be ~5Mbit/s, instead it was %.2fMbit/s"%
00079 (test.bandwidth.movavg(5)/1e6))
00080
00081 def test_get_capacity(self):
00082 self.tc.set_rate_limit(5e6)
00083 capacity = self.srcnode.get_link_capacity(sink_ip = "127.0.0.1", sink_port = 12345, latency_threshold = 0.1)
00084 self.assertTrue(capacity > 4.3e6 and capacity < 5.7e6,
00085 "Expected capacity on loopback interface to be ~5Mbit/s, instead it was %.2fMbit/s"%
00086 (capacity/1e6))
00087 self.tc.set_rate_limit(20e6)
00088 capacity = self.srcnode.get_link_capacity(sink_ip = "127.0.0.1", sink_port = 12345, latency_threshold = 0.1)
00089 self.assertTrue(capacity > 15e6 and capacity < 25e6,
00090 "Expected capacity on loopback interface to be ~20Mbit/s, instead it was %.2fMbit/s"%
00091 (capacity/1e6))
00092 self.tc.set_rate_limit(0.5e6)
00093 capacity = self.srcnode.get_link_capacity(sink_ip = "127.0.0.1", sink_port = 12345, latency_threshold = 0.1)
00094 self.assertTrue(capacity > 0.3e6 and capacity < 0.7e6,
00095 "Expected capacity on loopback interface to be ~0.5Mbit/s, instead it was %.2fMbit/s"%
00096 (capacity/1e6))
00097
00098 if __name__ == '__main__':
00099 try:
00100 rostest.run('network_monitor_udp', 'adaptive_bw_test', AdaptiveBandwidthTest)
00101 except KeyboardInterrupt, e:
00102 pass