$search
00001 #! /usr/bin/env python 00002 00003 import sys 00004 import time 00005 import subprocess 00006 import unittest 00007 00008 import roslib; roslib.load_manifest('network_monitor_udp') 00009 import rospy 00010 import rostest 00011 00012 from network_monitor_udp.linktest import UdpmonsourceHandle 00013 from network_monitor_udp.linktest import LinkTest 00014 from network_monitor_udp.msg import LinktestGoal 00015 00016 from tc_port_control import TcPortControl 00017 00018 class ConstantBandwidthTest(unittest.TestCase): 00019 def __init__(self, *args): 00020 super(ConstantBandwidthTest, self).__init__(*args) 00021 rospy.init_node('network_monitor_udp_test') 00022 self.srcnode = UdpmonsourceHandle('performance_test') 00023 self.tc = TcPortControl(self) 00024 self.tc.reset() 00025 00026 def setUp(self): 00027 self.srcnode.cancel_all_tests() 00028 self.tc.init() 00029 00030 def tearDown(self): 00031 self.tc.reset() 00032 00033 def test_basic(self): 00034 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 3.0, 00035 sink_ip = "127.0.0.1", sink_port = 12345, 00036 bw_type = LinktestGoal.BW_CONSTANT) 00037 test.start() 00038 time.sleep(3.5) 00039 self.assertTrue(test.done, "Test should have finished already") 00040 self.assertTrue(test.overall_latency > 0.0 and test.overall_latency < 0.005, 00041 "Expected latency on loopback interface to be positive and under 5ms, instead it was %.2fms"% 00042 (test.overall_latency * 1000)) 00043 self.assertAlmostEqual(test.overall_loss, 0.0, 2, 00044 "Expected packet loss on loopback interface to be zero, instead it was %.2f%%"% 00045 (test.overall_loss)) 00046 self.assertTrue(test.overall_bandwidth > 0.9 * 10**6, 00047 "Expected useful bandwidth on loopback interface to be at least 0.9Mbit/s, instead it was %.2fMbit/s"% 00048 (test.overall_bandwidth/1e6)) 00049 00050 00051 def test_bw_measurement(self): 00052 test = self.srcnode.create_test(bw = 3.0*10**6, pktsize = 1500, duration = 5.0, 00053 sink_ip = "127.0.0.1", sink_port = 12345, 00054 bw_type = LinktestGoal.BW_CONSTANT) 00055 self.tc.set_rate_limit(1e6) 00056 test.start() 00057 time.sleep(5.5) 00058 self.assertTrue(test.done, "Test should have finished already") 00059 self.assertTrue(test.overall_bandwidth < 1e6 and test.overall_bandwidth > 0.9e6, 00060 "Expected useful bandwidth on loopback interface to be at least 0.9Mbit/s, instead it was %.2fMbit/s"% 00061 (test.overall_bandwidth/1e6)) 00062 00063 def test_bw_measurement_variable_bw(self): 00064 test = self.srcnode.create_test(bw = 5.0*10**6, pktsize = 1500, duration = 5.0, 00065 sink_ip = "127.0.0.1", sink_port = 12345, 00066 bw_type = LinktestGoal.BW_CONSTANT, update_interval = 0.15) 00067 self.tc.set_rate_limit(1e6) 00068 test.start() 00069 time.sleep(2.0) 00070 self.assertTrue(test.bandwidth.movavg(5) < 1.15e6 and test.bandwidth.movavg(5) > 0.85e6, 00071 "Expected useful bandwidth on loopback interface to be ~1Mbit/s, instead it was %.2fMbit/s"% 00072 (test.bandwidth.movavg(5)/1e6)) 00073 self.tc.set_rate_limit(4e6) 00074 time.sleep(2.0) 00075 self.assertTrue(test.bandwidth.movavg(5) < 4.2e6 and test.bandwidth.movavg(5) > 3.8e6, 00076 "Expected useful bandwidth on loopback interface to be ~4Mbit/s, instead it was %.2fMbit/s"% 00077 (test.bandwidth.movavg(5)/1e6)) 00078 self.tc.set_rate_limit(0.25e6) 00079 time.sleep(2.0) 00080 self.assertTrue(test.bandwidth.movavg(5) < 0.35e6 and test.bandwidth.movavg(5) > 0.15e6, 00081 "Expected useful bandwidth on loopback interface to be ~0.25Mbit/s, instead it was %.2fMbit/s"% 00082 (test.bandwidth.movavg(5)/1e6)) 00083 00084 def test_loss_measurement(self): 00085 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 200, duration = 12.0, 00086 sink_ip = "127.0.0.1", sink_port = 12345, 00087 bw_type = LinktestGoal.BW_CONSTANT, update_interval = 0.15) 00088 self.tc.set_latency_loss(loss = 5) 00089 test.start() 00090 time.sleep(4.0) 00091 self.assertTrue(test.loss.movavg() < 8.5 and test.loss.movavg() > 1.5, 00092 "Expected packet loss on loopback interface to be ~5%%, instead it was %.2f%%"% 00093 (test.loss.movavg())) 00094 self.tc.set_latency_loss(loss = 20) 00095 time.sleep(4.0) 00096 self.assertTrue(test.loss.movavg() < 25.0 and test.loss.movavg() > 15.0, 00097 "Expected packet loss on loopback interface to be ~20%%, instead it was %.2f%%"% 00098 (test.loss.movavg())) 00099 self.tc.set_latency_loss(loss = 0) 00100 time.sleep(4.0) 00101 self.assertTrue(test.loss.movavg() < 1.0, 00102 "Expected packet loss on loopback interface to be ~0%%, instead it was %.2f%%"% 00103 (test.loss.movavg())) 00104 00105 def test_latency_measurement(self): 00106 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 200, duration = 6.0, 00107 sink_ip = "127.0.0.1", sink_port = 12345, 00108 bw_type = LinktestGoal.BW_CONSTANT, update_interval = 0.2) 00109 test.start() 00110 time.sleep(1.5) 00111 lo_latency = test.latency.movavg(3) 00112 self.tc.set_latency_loss(latency = 0.02) 00113 time.sleep(1.5) 00114 added_latency = test.latency.movavg(3) - lo_latency 00115 self.assertTrue(added_latency > 0.015 and added_latency < 0.025, 00116 "Expected added latency on loopback interface to be ~20ms, instead it was %.2fms"% 00117 (added_latency * 1000)) 00118 self.tc.set_latency_loss(latency = 0.06) 00119 time.sleep(1.5) 00120 added_latency = test.latency.movavg(3) - lo_latency 00121 self.assertTrue(added_latency > 0.052 and added_latency < 0.068, 00122 "Expected added latency on loopback interface to be ~60ms, instead it was %.2fms"% 00123 (added_latency * 1000)) 00124 self.tc.set_latency_loss(latency = 0) 00125 time.sleep(1.5) 00126 added_latency = test.latency.movavg(3) - lo_latency 00127 self.assertTrue(added_latency < 0.002, 00128 "Expected added latency on loopback interface to be ~0ms, instead it was %.2fms"% 00129 (added_latency * 1000)) 00130 00131 def test_latency_loss(self): 00132 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 200, duration = 3.0, 00133 sink_ip = "127.0.0.1", sink_port = 12345, 00134 bw_type = LinktestGoal.BW_CONSTANT, update_interval = 0.2) 00135 self.tc.set_latency_loss(loss = 5, latency = 0.05) 00136 test.start() 00137 time.sleep(3.5) 00138 self.assertTrue(test.loss.movavg(10) < 7.5 and test.loss.movavg(10) > 2.5, 00139 "Expected packet loss on loopback interface to be ~5%%, instead it was %.2f%%"% 00140 (test.loss.movavg(10))) 00141 self.assertTrue(test.latency.movavg(10) < 0.07 and test.latency.movavg(10) > 0.05, 00142 "Expected latency on loopback interface to be ~60ms, instead it was %.2fms"% 00143 (test.latency.movavg(10)* 1000)) 00144 00145 if __name__ == '__main__': 00146 try: 00147 rostest.run('network_monitor_udp', 'blahblah', ConstantBandwidthTest) 00148 except KeyboardInterrupt, e: 00149 pass