00001
00002
00003 import sys
00004 import time
00005 import subprocess
00006 import unittest
00007
00008 import roslib; roslib.load_manifest('network_monitor_udp')
00009 import rospy
00010 import rostest
00011
00012 from network_monitor_udp.linktest import UdpmonsourceHandle
00013 from network_monitor_udp.linktest import LinkTest
00014 from network_monitor_udp.msg import LinktestGoal
00015
00016 from tc_port_control import TcPortControl
00017
00018 class ConstantBandwidthTest(unittest.TestCase):
00019 def __init__(self, *args):
00020 super(ConstantBandwidthTest, self).__init__(*args)
00021 rospy.init_node('network_monitor_udp_test')
00022 self.srcnode = UdpmonsourceHandle('performance_test')
00023 self.tc = TcPortControl(self)
00024 self.tc.reset()
00025
00026 def setUp(self):
00027 self.srcnode.cancel_all_tests()
00028 self.tc.init()
00029
00030 def tearDown(self):
00031 self.tc.reset()
00032
00033 def test_basic(self):
00034 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 3.0,
00035 sink_ip = "127.0.0.1", sink_port = 12345,
00036 bw_type = LinktestGoal.BW_CONSTANT)
00037 test.start()
00038 time.sleep(3.5)
00039 self.assertTrue(test.done, "Test should have finished already")
00040 self.assertTrue(test.overall_latency > 0.0 and test.overall_latency < 0.005,
00041 "Expected latency on loopback interface to be positive and under 5ms, instead it was %.2fms"%
00042 (test.overall_latency * 1000))
00043 self.assertAlmostEqual(test.overall_loss, 0.0, 2,
00044 "Expected packet loss on loopback interface to be zero, instead it was %.2f%%"%
00045 (test.overall_loss))
00046 self.assertTrue(test.overall_bandwidth > 0.9 * 10**6,
00047 "Expected useful bandwidth on loopback interface to be at least 0.9Mbit/s, instead it was %.2fMbit/s"%
00048 (test.overall_bandwidth/1e6))
00049
00050
00051 def test_bw_measurement(self):
00052 test = self.srcnode.create_test(bw = 3.0*10**6, pktsize = 1500, duration = 5.0,
00053 sink_ip = "127.0.0.1", sink_port = 12345,
00054 bw_type = LinktestGoal.BW_CONSTANT)
00055 self.tc.set_rate_limit(1e6)
00056 test.start()
00057 time.sleep(5.5)
00058 self.assertTrue(test.done, "Test should have finished already")
00059 self.assertTrue(test.overall_bandwidth < 1e6 and test.overall_bandwidth > 0.9e6,
00060 "Expected useful bandwidth on loopback interface to be at least 0.9Mbit/s, instead it was %.2fMbit/s"%
00061 (test.overall_bandwidth/1e6))
00062
00063 def test_bw_measurement_variable_bw(self):
00064 test = self.srcnode.create_test(bw = 5.0*10**6, pktsize = 1500, duration = 5.0,
00065 sink_ip = "127.0.0.1", sink_port = 12345,
00066 bw_type = LinktestGoal.BW_CONSTANT, update_interval = 0.15)
00067 self.tc.set_rate_limit(1e6)
00068 test.start()
00069 time.sleep(2.0)
00070 self.assertTrue(test.bandwidth.movavg(5) < 1.15e6 and test.bandwidth.movavg(5) > 0.85e6,
00071 "Expected useful bandwidth on loopback interface to be ~1Mbit/s, instead it was %.2fMbit/s"%
00072 (test.bandwidth.movavg(5)/1e6))
00073 self.tc.set_rate_limit(4e6)
00074 time.sleep(2.0)
00075 self.assertTrue(test.bandwidth.movavg(5) < 4.2e6 and test.bandwidth.movavg(5) > 3.8e6,
00076 "Expected useful bandwidth on loopback interface to be ~4Mbit/s, instead it was %.2fMbit/s"%
00077 (test.bandwidth.movavg(5)/1e6))
00078 self.tc.set_rate_limit(0.25e6)
00079 time.sleep(2.0)
00080 self.assertTrue(test.bandwidth.movavg(5) < 0.35e6 and test.bandwidth.movavg(5) > 0.15e6,
00081 "Expected useful bandwidth on loopback interface to be ~0.25Mbit/s, instead it was %.2fMbit/s"%
00082 (test.bandwidth.movavg(5)/1e6))
00083
00084 def test_loss_measurement(self):
00085 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 200, duration = 12.0,
00086 sink_ip = "127.0.0.1", sink_port = 12345,
00087 bw_type = LinktestGoal.BW_CONSTANT, update_interval = 0.15)
00088 self.tc.set_latency_loss(loss = 5)
00089 test.start()
00090 time.sleep(4.0)
00091 self.assertTrue(test.loss.movavg() < 8.5 and test.loss.movavg() > 1.5,
00092 "Expected packet loss on loopback interface to be ~5%%, instead it was %.2f%%"%
00093 (test.loss.movavg()))
00094 self.tc.set_latency_loss(loss = 20)
00095 time.sleep(4.0)
00096 self.assertTrue(test.loss.movavg() < 25.0 and test.loss.movavg() > 15.0,
00097 "Expected packet loss on loopback interface to be ~20%%, instead it was %.2f%%"%
00098 (test.loss.movavg()))
00099 self.tc.set_latency_loss(loss = 0)
00100 time.sleep(4.0)
00101 self.assertTrue(test.loss.movavg() < 1.0,
00102 "Expected packet loss on loopback interface to be ~0%%, instead it was %.2f%%"%
00103 (test.loss.movavg()))
00104
00105 def test_latency_measurement(self):
00106 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 200, duration = 6.0,
00107 sink_ip = "127.0.0.1", sink_port = 12345,
00108 bw_type = LinktestGoal.BW_CONSTANT, update_interval = 0.2)
00109 test.start()
00110 time.sleep(1.5)
00111 lo_latency = test.latency.movavg(3)
00112 self.tc.set_latency_loss(latency = 0.02)
00113 time.sleep(1.5)
00114 added_latency = test.latency.movavg(3) - lo_latency
00115 self.assertTrue(added_latency > 0.015 and added_latency < 0.025,
00116 "Expected added latency on loopback interface to be ~20ms, instead it was %.2fms"%
00117 (added_latency * 1000))
00118 self.tc.set_latency_loss(latency = 0.06)
00119 time.sleep(1.5)
00120 added_latency = test.latency.movavg(3) - lo_latency
00121 self.assertTrue(added_latency > 0.052 and added_latency < 0.068,
00122 "Expected added latency on loopback interface to be ~60ms, instead it was %.2fms"%
00123 (added_latency * 1000))
00124 self.tc.set_latency_loss(latency = 0)
00125 time.sleep(1.5)
00126 added_latency = test.latency.movavg(3) - lo_latency
00127 self.assertTrue(added_latency < 0.002,
00128 "Expected added latency on loopback interface to be ~0ms, instead it was %.2fms"%
00129 (added_latency * 1000))
00130
00131 def test_latency_loss(self):
00132 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 200, duration = 3.0,
00133 sink_ip = "127.0.0.1", sink_port = 12345,
00134 bw_type = LinktestGoal.BW_CONSTANT, update_interval = 0.2)
00135 self.tc.set_latency_loss(loss = 5, latency = 0.05)
00136 test.start()
00137 time.sleep(3.5)
00138 self.assertTrue(test.loss.movavg(10) < 7.5 and test.loss.movavg(10) > 2.5,
00139 "Expected packet loss on loopback interface to be ~5%%, instead it was %.2f%%"%
00140 (test.loss.movavg(10)))
00141 self.assertTrue(test.latency.movavg(10) < 0.07 and test.latency.movavg(10) > 0.05,
00142 "Expected latency on loopback interface to be ~60ms, instead it was %.2fms"%
00143 (test.latency.movavg(10)* 1000))
00144
00145 if __name__ == '__main__':
00146 try:
00147 rostest.run('network_monitor_udp', 'blahblah', ConstantBandwidthTest)
00148 except KeyboardInterrupt, e:
00149 pass