00001
00002
00003 import sys
00004 import time
00005 import subprocess
00006 import unittest
00007
00008 import roslib; roslib.load_manifest('network_monitor_udp')
00009 import rospy
00010 import rostest
00011
00012 from network_monitor_udp.linktest import UdpmonsourceHandle
00013 from network_monitor_udp.linktest import LinkTest
00014 from network_monitor_udp.msg import LinktestGoal
00015
00016 import scapy.all as scapy
00017 from tc_port_control import TcPortControl
00018
00019 class ParameterTest(unittest.TestCase):
00020 def __init__(self, *args):
00021 super(ParameterTest, self).__init__(*args)
00022 rospy.init_node('network_monitor_udp_test')
00023 self.srcnode = UdpmonsourceHandle('performance_test')
00024 self.tc = TcPortControl(self)
00025 self.tc.reset()
00026
00027 def setUp(self):
00028 self.srcnode.cancel_all_tests()
00029 self.tc.init()
00030
00031 def tearDown(self):
00032 self.tc.reset()
00033
00034 def test_tos(self):
00035 time.sleep(1.0)
00036
00037 test1 = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 1.0,
00038 sink_ip = "127.0.0.1", sink_port = 12345,
00039 bw_type = LinktestGoal.BW_CONSTANT,
00040 update_interval = 0.2, tos = 0x10)
00041 test2 = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 1.0,
00042 sink_ip = "127.0.0.1", sink_port = 12346,
00043 bw_type = LinktestGoal.BW_CONSTANT,
00044 update_interval = 0.2, tos = 0x11)
00045
00046 test1.start()
00047 test2.start()
00048
00049 s = scapy.L2Socket(iface='lo', filter='udp and port 12345')
00050
00051 while True:
00052 pkt = s.recv(2048)
00053 if pkt is None:
00054 continue
00055 if pkt.type != 0x800:
00056 print "Unexpected protocol 0x%04x"%pkt.type
00057 continue
00058 if pkt.dport == 12345:
00059 tos = pkt.payload.tos
00060 break
00061
00062 self.assertEqual(tos, 0x10,
00063 "Unexpected TOS value for test1" + str(pkt))
00064
00065 s = scapy.L2Socket(iface='lo', filter='udp and port 12346')
00066
00067 while True:
00068 pkt = s.recv(2048)
00069 if pkt is None:
00070 continue
00071 if pkt.type != 0x800:
00072 print "Unexpected protocol 0x%04x"%pkt.type
00073 continue
00074 if pkt.dport == 12346:
00075 tos = pkt.payload.tos
00076 break
00077
00078 self.assertEqual(tos, 0x11,
00079 "Unexpected TOS value for test2: " + str(pkt))
00080
00081 def test_latencybins(self):
00082 self.tc.set_latency_loss(latency = 0.15)
00083 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.5,
00084 sink_ip = "127.0.0.1", sink_port = 12345,
00085 bw_type = LinktestGoal.BW_CONSTANT,
00086 update_interval = 0.2)
00087 test.start()
00088 time.sleep(2.0)
00089 self.assertTrue(test.done, "Expected test to have ended")
00090 self.assertTrue(test.overall_loss > 90.0,
00091 "Expected 100%% loss, instead it was %.2f%%"%(test.overall_loss))
00092 self.assertTrue(test.latency_histogram[-1] > 0.95 and sum(test.latency_histogram[:-1]) < 0.05,
00093 "Expected all packets to be in the last bin (100ms, infinity)")
00094
00095 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.5,
00096 sink_ip = "127.0.0.1", sink_port = 12345,
00097 bw_type = LinktestGoal.BW_CONSTANT,
00098 update_interval = 0.2, latencybins = [ 0.01, 0.1, 0.5 ])
00099 test.start()
00100 time.sleep(2.0)
00101 self.assertTrue(test.done, "Expected test to have ended")
00102 self.assertTrue( test.overall_loss < 2.0,
00103 "Expected 0%% loss, instead it was %.2f%%"%
00104 (test.overall_loss))
00105
00106 def test_max_return_time(self):
00107 self.tc.set_latency_loss(latency = 0.075)
00108 self.tc.set_latency_loss_udp_returnpath(latency = 0.175)
00109
00110 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.0,
00111 sink_ip = "127.0.0.1", sink_port = 12345,
00112 bw_type = LinktestGoal.BW_CONSTANT,
00113 update_interval = 0.05, ros_returnpath = False)
00114 test.start()
00115 time.sleep(1.5)
00116 self.assertTrue(test.done, "Expected test to have ended")
00117 self.assertEqual(test.latency.min(), 0.0,
00118 "Expected min latency to be 0.0ms meaning that for at least one interval "
00119 "(in fact, this should hold for all intervals) no packets were restricted [1]")
00120
00121
00122 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.0,
00123 sink_ip = "127.0.0.1", sink_port = 12345,
00124 bw_type = LinktestGoal.BW_CONSTANT,
00125 update_interval = 0.05, max_return_time = 0.01)
00126 test.start()
00127 time.sleep(1.5)
00128 self.assertTrue(test.done, "Expected test to have ended")
00129 self.assertEqual(test.latency.min(), 0.0,
00130 "Expected min latency to be 0.0ms meaning that for at least one interval "
00131 "(in fact, this should hold for all intervals) no packets were restricted [1]")
00132
00133 test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.0,
00134 sink_ip = "127.0.0.1", sink_port = 12345,
00135 bw_type = LinktestGoal.BW_CONSTANT,
00136 update_interval = 0.05, max_return_time = 0.225)
00137 test.start()
00138 time.sleep(1.5)
00139 self.assertTrue(test.done, "Expected test to have ended")
00140 self.assertTrue(test.latency.min() > 0.05,
00141 "Expected min latency to be ~75ms (meaning that there was no interval which had "
00142 " no restricted packets), instead it was %.2fms"%
00143 (test.latency.min()))
00144 self.assertTrue(test.loss.min() < 2.0,
00145 "Expected 0%% loss during all intervals, instead for one interval it was %.2f%%"%
00146 (test.loss.min()) )
00147
00148 def test_udp_return(self):
00149 self.tc.set_latency_loss_udp_returnpath(loss = 20.0)
00150
00151 test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0,
00152 sink_ip = "127.0.0.1", sink_port = 12345,
00153 bw_type = LinktestGoal.BW_CONSTANT,
00154 update_interval = 0.05, ros_returnpath = False)
00155 test.start()
00156 time.sleep(1.5)
00157 self.assertTrue(test.done, "Expected test to have ended")
00158 self.assertTrue(test.loss.movavg() > 10.0,
00159 "Expected loss to be ~20%% on UDP return path instead it was %.2f%%"%
00160 (test.loss.movavg()))
00161
00162 test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0,
00163 sink_ip = "127.0.0.1", sink_port = 12345,
00164 bw_type = LinktestGoal.BW_CONSTANT,
00165 update_interval = 0.05, ros_returnpath = True)
00166 test.start()
00167 time.sleep(1.5)
00168 self.assertTrue(test.done, "Expected test to have ended")
00169 self.assertTrue(test.loss.movavg() < 2.0,
00170 "Expected loss to be ~0%% with ROS return path instead it was %.2f%%"%
00171 (test.loss.movavg()))
00172
00173 def test_roundtrip(self):
00174 self.tc.set_latency_loss_udp_returnpath(latency = 0.05)
00175
00176 test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0,
00177 sink_ip = "127.0.0.1", sink_port = 12345,
00178 bw_type = LinktestGoal.BW_CONSTANT,
00179 update_interval = 0.05, ros_returnpath = False,
00180 roundtrip = False)
00181 test.start()
00182 time.sleep(1.5)
00183 self.assertTrue(test.done, "Expected test to have ended")
00184 self.assertTrue(test.latency.movavg() < 0.01,
00185 "Expected latency to be < 1ms, instead it was %.2fms"%
00186 (test.latency.movavg() * 1e3))
00187
00188 test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0,
00189 sink_ip = "127.0.0.1", sink_port = 12345,
00190 bw_type = LinktestGoal.BW_CONSTANT,
00191 update_interval = 0.05, ros_returnpath = False,
00192 roundtrip = True)
00193 test.start()
00194 time.sleep(1.5)
00195 self.assertTrue(test.done, "Expected test to have ended")
00196 self.assertTrue(test.latency.movavg() > 0.03,
00197 "Expected latency to be ~50ms, instead it was %.2fms"%
00198 (test.latency.movavg() * 1e3))
00199
00200 if __name__ == '__main__':
00201 try:
00202 rostest.run('network_monitor_udp', 'parameter_test', ParameterTest)
00203 except KeyboardInterrupt, e:
00204 pass