$search
00001 #! /usr/bin/env python 00002 00003 import sys 00004 import time 00005 import subprocess 00006 import unittest 00007 00008 import roslib; roslib.load_manifest('network_monitor_udp') 00009 import rospy 00010 import rostest 00011 00012 from network_monitor_udp.linktest import UdpmonsourceHandle 00013 from network_monitor_udp.linktest import LinkTest 00014 from network_monitor_udp.msg import LinktestGoal 00015 00016 from tc_port_control import TcPortControl 00017 00018 class AdaptiveBandwidthTest(unittest.TestCase): 00019 def __init__(self, *args): 00020 super(AdaptiveBandwidthTest, self).__init__(*args) 00021 rospy.init_node('network_monitor_udp_test') 00022 self.srcnode = UdpmonsourceHandle('performance_test') 00023 self.tc = TcPortControl(self) 00024 self.tc.reset() 00025 00026 def setUp(self): 00027 self.srcnode.cancel_all_tests() 00028 self.tc.init() 00029 00030 def tearDown(self): 00031 self.tc.reset() 00032 00033 def test_ramp_up(self): 00034 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 5.0, 00035 sink_ip = "127.0.0.1", sink_port = 12345, 00036 bw_type = LinktestGoal.BW_ADAPTIVE, update_interval = 0.2, 00037 latency_threshold = 0.1) 00038 self.tc.set_rate_limit(5e6) 00039 test.start() 00040 time.sleep(5.5) 00041 self.assertTrue(test.bandwidth.movavg(5) > 4.3e6 and test.bandwidth.movavg(5) < 5.7e6, 00042 "Expected capacity on loopback interface to be ~5Mbit/s, instead it was %.2fMbit/s"% 00043 (test.bandwidth.movavg(5)/1e6)) 00044 00045 def test_ramp_down(self): 00046 test = self.srcnode.create_test(bw = 5.0*10**6, pktsize = 1500, duration = 5.0, 00047 sink_ip = "127.0.0.1", sink_port = 12345, 00048 bw_type = LinktestGoal.BW_ADAPTIVE, update_interval = 0.2, 00049 latency_threshold = 0.1) 00050 self.tc.set_rate_limit(5e6) 00051 test.start() 00052 time.sleep(2.0) 00053 self.tc.set_rate_limit(1e6) 00054 time.sleep(3.0) 00055 self.assertTrue(test.bandwidth.movavg(5) > 0.6e6 and test.bandwidth.movavg(5) < 1.4e6, 00056 "Expected capacity on loopback interface to be ~1Mbit/s, instead it was %.2fMbit/s"% 00057 (test.bandwidth.movavg(5)/1e6)) 00058 00059 def test_varying_capacity(self): 00060 test = self.srcnode.create_test(bw = 2*10**6, pktsize = 1500, duration = 15.0, 00061 sink_ip = "127.0.0.1", sink_port = 12345, 00062 bw_type = LinktestGoal.BW_ADAPTIVE, update_interval = 0.2, 00063 latency_threshold = 0.1) 00064 self.tc.set_rate_limit(10e6) 00065 test.start() 00066 time.sleep(4.0) 00067 self.assertTrue(test.bandwidth.movavg(5) > 9e6 and test.bandwidth.movavg(5) < 11e6, 00068 "Expected capacity on loopback interface to be ~10Mbit/s, instead it was %.2fMbit/s"% 00069 (test.bandwidth.movavg(5)/1e6)) 00070 self.tc.set_rate_limit(1e6) 00071 time.sleep(4.0) 00072 self.assertTrue(test.bandwidth.movavg(5) > 0.6e6 and test.bandwidth.movavg(5) < 1.4e6, 00073 "Expected capacity on loopback interface to be ~1Mbit/s, instead it was %.2fMbit/s"% 00074 (test.bandwidth.movavg(5)/1e6)) 00075 self.tc.set_rate_limit(5e6) 00076 time.sleep(6.0) 00077 self.assertTrue(test.bandwidth.movavg(5) > 3.5e6 and test.bandwidth.movavg(5) < 5.7e6, 00078 "Expected capacity on loopback interface to be ~5Mbit/s, instead it was %.2fMbit/s"% 00079 (test.bandwidth.movavg(5)/1e6)) 00080 00081 def test_get_capacity(self): 00082 self.tc.set_rate_limit(5e6) 00083 capacity = self.srcnode.get_link_capacity(sink_ip = "127.0.0.1", sink_port = 12345, latency_threshold = 0.1) 00084 self.assertTrue(capacity > 4.3e6 and capacity < 5.7e6, 00085 "Expected capacity on loopback interface to be ~5Mbit/s, instead it was %.2fMbit/s"% 00086 (capacity/1e6)) 00087 self.tc.set_rate_limit(20e6) 00088 capacity = self.srcnode.get_link_capacity(sink_ip = "127.0.0.1", sink_port = 12345, latency_threshold = 0.1) 00089 self.assertTrue(capacity > 15e6 and capacity < 25e6, 00090 "Expected capacity on loopback interface to be ~20Mbit/s, instead it was %.2fMbit/s"% 00091 (capacity/1e6)) 00092 self.tc.set_rate_limit(0.5e6) 00093 capacity = self.srcnode.get_link_capacity(sink_ip = "127.0.0.1", sink_port = 12345, latency_threshold = 0.1) 00094 self.assertTrue(capacity > 0.3e6 and capacity < 0.7e6, 00095 "Expected capacity on loopback interface to be ~0.5Mbit/s, instead it was %.2fMbit/s"% 00096 (capacity/1e6)) 00097 00098 if __name__ == '__main__': 00099 try: 00100 rostest.run('network_monitor_udp', 'adaptive_bw_test', AdaptiveBandwidthTest) 00101 except KeyboardInterrupt, e: 00102 pass