$search
00001 #! /usr/bin/env python 00002 00003 import sys 00004 import time 00005 import subprocess 00006 import unittest 00007 00008 import roslib; roslib.load_manifest('network_monitor_udp') 00009 import rospy 00010 import rostest 00011 00012 from network_monitor_udp.linktest import UdpmonsourceHandle 00013 from network_monitor_udp.linktest import LinkTest 00014 from network_monitor_udp.msg import LinktestGoal 00015 00016 import scapy.all as scapy 00017 from tc_port_control import TcPortControl 00018 00019 class ParameterTest(unittest.TestCase): 00020 def __init__(self, *args): 00021 super(ParameterTest, self).__init__(*args) 00022 rospy.init_node('network_monitor_udp_test') 00023 self.srcnode = UdpmonsourceHandle('performance_test') 00024 self.tc = TcPortControl(self) 00025 self.tc.reset() 00026 00027 def setUp(self): 00028 self.srcnode.cancel_all_tests() 00029 self.tc.init() 00030 00031 def tearDown(self): 00032 self.tc.reset() 00033 00034 def test_tos(self): 00035 time.sleep(1.0) 00036 00037 test1 = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 1.0, 00038 sink_ip = "127.0.0.1", sink_port = 12345, 00039 bw_type = LinktestGoal.BW_CONSTANT, 00040 update_interval = 0.2, tos = 0x10) 00041 test2 = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 1.0, 00042 sink_ip = "127.0.0.1", sink_port = 12346, 00043 bw_type = LinktestGoal.BW_CONSTANT, 00044 update_interval = 0.2, tos = 0x11) 00045 00046 test1.start() 00047 test2.start() 00048 00049 s = scapy.L2Socket(iface='lo', filter='udp and port 12345') 00050 00051 while True: 00052 pkt = s.recv(2048) 00053 if pkt is None: 00054 continue 00055 if pkt.type != 0x800: 00056 print "Unexpected protocol 0x%04x"%pkt.type 00057 continue 00058 if pkt.dport == 12345: 00059 tos = pkt.payload.tos 00060 break 00061 00062 self.assertEqual(tos, 0x10, 00063 "Unexpected TOS value for test1" + str(pkt)) 00064 00065 s = scapy.L2Socket(iface='lo', filter='udp and port 12346') 00066 00067 while True: 00068 pkt = s.recv(2048) 00069 if pkt is None: 00070 continue 00071 if pkt.type != 0x800: 00072 print "Unexpected protocol 0x%04x"%pkt.type 00073 continue 00074 if pkt.dport == 12346: 00075 tos = pkt.payload.tos 00076 break 00077 00078 self.assertEqual(tos, 0x11, 00079 "Unexpected TOS value for test2: " + str(pkt)) 00080 00081 def test_latencybins(self): 00082 self.tc.set_latency_loss(latency = 0.15) 00083 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.5, 00084 sink_ip = "127.0.0.1", sink_port = 12345, 00085 bw_type = LinktestGoal.BW_CONSTANT, 00086 update_interval = 0.2) 00087 test.start() 00088 time.sleep(2.0) 00089 self.assertTrue(test.done, "Expected test to have ended") 00090 self.assertTrue(test.overall_loss > 90.0, 00091 "Expected 100%% loss, instead it was %.2f%%"%(test.overall_loss)) 00092 self.assertTrue(test.latency_histogram[-1] > 0.95 and sum(test.latency_histogram[:-1]) < 0.05, 00093 "Expected all packets to be in the last bin (100ms, infinity)") 00094 00095 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.5, 00096 sink_ip = "127.0.0.1", sink_port = 12345, 00097 bw_type = LinktestGoal.BW_CONSTANT, 00098 update_interval = 0.2, latencybins = [ 0.01, 0.1, 0.5 ]) 00099 test.start() 00100 time.sleep(2.0) 00101 self.assertTrue(test.done, "Expected test to have ended") 00102 self.assertTrue( test.overall_loss < 2.0, 00103 "Expected 0%% loss, instead it was %.2f%%"% 00104 (test.overall_loss)) 00105 00106 def test_max_return_time(self): 00107 self.tc.set_latency_loss(latency = 0.075) 00108 self.tc.set_latency_loss_udp_returnpath(latency = 0.175) 00109 00110 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.0, 00111 sink_ip = "127.0.0.1", sink_port = 12345, 00112 bw_type = LinktestGoal.BW_CONSTANT, 00113 update_interval = 0.05, ros_returnpath = False) 00114 test.start() 00115 time.sleep(1.5) 00116 self.assertTrue(test.done, "Expected test to have ended") 00117 self.assertEqual(test.latency.min(), 0.0, 00118 "Expected min latency to be 0.0ms meaning that for at least one interval " 00119 "(in fact, this should hold for all intervals) no packets were restricted [1]") 00120 00121 00122 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.0, 00123 sink_ip = "127.0.0.1", sink_port = 12345, 00124 bw_type = LinktestGoal.BW_CONSTANT, 00125 update_interval = 0.05, max_return_time = 0.01) 00126 test.start() 00127 time.sleep(1.5) 00128 self.assertTrue(test.done, "Expected test to have ended") 00129 self.assertEqual(test.latency.min(), 0.0, 00130 "Expected min latency to be 0.0ms meaning that for at least one interval " 00131 "(in fact, this should hold for all intervals) no packets were restricted [1]") 00132 00133 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.0, 00134 sink_ip = "127.0.0.1", sink_port = 12345, 00135 bw_type = LinktestGoal.BW_CONSTANT, 00136 update_interval = 0.05, max_return_time = 0.225) 00137 test.start() 00138 time.sleep(1.5) 00139 self.assertTrue(test.done, "Expected test to have ended") 00140 self.assertTrue(test.latency.min() > 0.05, 00141 "Expected min latency to be ~75ms (meaning that there was no interval which had " 00142 " no restricted packets), instead it was %.2fms"% 00143 (test.latency.min())) 00144 self.assertTrue(test.loss.min() < 2.0, 00145 "Expected 0%% loss during all intervals, instead for one interval it was %.2f%%"% 00146 (test.loss.min()) ) 00147 00148 def test_udp_return(self): 00149 self.tc.set_latency_loss_udp_returnpath(loss = 20.0) 00150 00151 test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0, 00152 sink_ip = "127.0.0.1", sink_port = 12345, 00153 bw_type = LinktestGoal.BW_CONSTANT, 00154 update_interval = 0.05, ros_returnpath = False) 00155 test.start() 00156 time.sleep(1.5) 00157 self.assertTrue(test.done, "Expected test to have ended") 00158 self.assertTrue(test.loss.movavg() > 10.0, 00159 "Expected loss to be ~20%% on UDP return path instead it was %.2f%%"% 00160 (test.loss.movavg())) 00161 00162 test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0, 00163 sink_ip = "127.0.0.1", sink_port = 12345, 00164 bw_type = LinktestGoal.BW_CONSTANT, 00165 update_interval = 0.05, ros_returnpath = True) 00166 test.start() 00167 time.sleep(1.5) 00168 self.assertTrue(test.done, "Expected test to have ended") 00169 self.assertTrue(test.loss.movavg() < 2.0, 00170 "Expected loss to be ~0%% with ROS return path instead it was %.2f%%"% 00171 (test.loss.movavg())) 00172 00173 def test_roundtrip(self): 00174 self.tc.set_latency_loss_udp_returnpath(latency = 0.05) 00175 00176 test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0, 00177 sink_ip = "127.0.0.1", sink_port = 12345, 00178 bw_type = LinktestGoal.BW_CONSTANT, 00179 update_interval = 0.05, ros_returnpath = False, 00180 roundtrip = False) 00181 test.start() 00182 time.sleep(1.5) 00183 self.assertTrue(test.done, "Expected test to have ended") 00184 self.assertTrue(test.latency.movavg() < 0.01, 00185 "Expected latency to be < 1ms, instead it was %.2fms"% 00186 (test.latency.movavg() * 1e3)) 00187 00188 test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0, 00189 sink_ip = "127.0.0.1", sink_port = 12345, 00190 bw_type = LinktestGoal.BW_CONSTANT, 00191 update_interval = 0.05, ros_returnpath = False, 00192 roundtrip = True) 00193 test.start() 00194 time.sleep(1.5) 00195 self.assertTrue(test.done, "Expected test to have ended") 00196 self.assertTrue(test.latency.movavg() > 0.03, 00197 "Expected latency to be ~50ms, instead it was %.2fms"% 00198 (test.latency.movavg() * 1e3)) 00199 00200 if __name__ == '__main__': 00201 try: 00202 rostest.run('network_monitor_udp', 'parameter_test', ParameterTest) 00203 except KeyboardInterrupt, e: 00204 pass