parameter_test.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 import sys
00004 import time
00005 import subprocess
00006 import unittest
00007 
00008 import roslib; roslib.load_manifest('network_monitor_udp')
00009 import rospy
00010 import rostest 
00011 
00012 from network_monitor_udp.linktest import UdpmonsourceHandle
00013 from network_monitor_udp.linktest import LinkTest
00014 from network_monitor_udp.msg import LinktestGoal
00015 
00016 import scapy.all as scapy
00017 from tc_port_control import TcPortControl
00018 
00019 class ParameterTest(unittest.TestCase):
00020     def __init__(self, *args):
00021         super(ParameterTest, self).__init__(*args)
00022         rospy.init_node('network_monitor_udp_test')
00023         self.srcnode = UdpmonsourceHandle('performance_test')
00024         self.tc = TcPortControl(self)
00025         self.tc.reset()
00026 
00027     def setUp(self):
00028         self.srcnode.cancel_all_tests()
00029         self.tc.init()
00030 
00031     def tearDown(self):
00032         self.tc.reset()
00033 
00034     def test_tos(self):
00035         time.sleep(1.0)
00036 
00037         test1 = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 1.0,
00038                                          sink_ip = "127.0.0.1", sink_port = 12345,
00039                                          bw_type = LinktestGoal.BW_CONSTANT, 
00040                                          update_interval = 0.2, tos = 0x10)
00041         test2 = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 1.0,
00042                                          sink_ip = "127.0.0.1", sink_port = 12346,
00043                                          bw_type = LinktestGoal.BW_CONSTANT, 
00044                                          update_interval = 0.2, tos = 0x11)
00045         
00046         test1.start()
00047         test2.start()
00048 
00049         s = scapy.L2Socket(iface='lo', filter='udp and port 12345')
00050 
00051         while True:
00052             pkt = s.recv(2048)
00053             if pkt is None:
00054                 continue
00055             if pkt.type != 0x800:
00056                 print "Unexpected protocol 0x%04x"%pkt.type
00057                 continue
00058             if pkt.dport == 12345:
00059                 tos = pkt.payload.tos
00060                 break
00061 
00062         self.assertEqual(tos, 0x10,
00063                          "Unexpected TOS value for test1" + str(pkt))
00064 
00065         s = scapy.L2Socket(iface='lo', filter='udp and port 12346')
00066 
00067         while True:
00068             pkt = s.recv(2048)
00069             if pkt is None:
00070                 continue
00071             if pkt.type != 0x800:
00072                 print "Unexpected protocol 0x%04x"%pkt.type
00073                 continue
00074             if pkt.dport == 12346:
00075                 tos = pkt.payload.tos
00076                 break
00077 
00078         self.assertEqual(tos, 0x11,
00079                          "Unexpected TOS value for test2: " + str(pkt))
00080 
00081     def test_latencybins(self):
00082         self.tc.set_latency_loss(latency = 0.15)
00083         test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.5,
00084                                          sink_ip = "127.0.0.1", sink_port = 12345,
00085                                          bw_type = LinktestGoal.BW_CONSTANT, 
00086                                          update_interval = 0.2)
00087         test.start()
00088         time.sleep(2.0)
00089         self.assertTrue(test.done, "Expected test to have ended")
00090         self.assertTrue(test.overall_loss > 90.0,
00091                         "Expected 100%% loss, instead it was %.2f%%"%(test.overall_loss))
00092         self.assertTrue(test.latency_histogram[-1] > 0.95 and sum(test.latency_histogram[:-1]) < 0.05,
00093                         "Expected all packets to be in the last bin (100ms, infinity)")
00094                     
00095         test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.5,
00096                                         sink_ip = "127.0.0.1", sink_port = 12345,
00097                                         bw_type = LinktestGoal.BW_CONSTANT, 
00098                                         update_interval = 0.2, latencybins = [ 0.01, 0.1, 0.5 ])
00099         test.start()
00100         time.sleep(2.0)
00101         self.assertTrue(test.done, "Expected test to have ended")
00102         self.assertTrue( test.overall_loss < 2.0,
00103                          "Expected 0%% loss, instead it was %.2f%%"%
00104                          (test.overall_loss))
00105 
00106     def test_max_return_time(self):
00107         self.tc.set_latency_loss(latency = 0.075)
00108         self.tc.set_latency_loss_udp_returnpath(latency = 0.175)
00109 
00110         test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.0,
00111                                          sink_ip = "127.0.0.1", sink_port = 12345,
00112                                          bw_type = LinktestGoal.BW_CONSTANT, 
00113                                          update_interval = 0.05, ros_returnpath = False) 
00114         test.start()
00115         time.sleep(1.5)
00116         self.assertTrue(test.done, "Expected test to have ended")
00117         self.assertEqual(test.latency.min(), 0.0,
00118                         "Expected min latency to be 0.0ms meaning that for at least one interval "
00119                         "(in fact, this should hold for all intervals) no packets were restricted [1]")
00120                    
00121         
00122         test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.0,
00123                                         sink_ip = "127.0.0.1", sink_port = 12345,
00124                                         bw_type = LinktestGoal.BW_CONSTANT, 
00125                                         update_interval = 0.05, max_return_time = 0.01)
00126         test.start()
00127         time.sleep(1.5)
00128         self.assertTrue(test.done, "Expected test to have ended")
00129         self.assertEqual(test.latency.min(), 0.0,
00130                         "Expected min latency to be 0.0ms meaning that for at least one interval "
00131                         "(in fact, this should hold for all intervals) no packets were restricted [1]")
00132 
00133         test = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 500, duration = 1.0,
00134                                         sink_ip = "127.0.0.1", sink_port = 12345,
00135                                         bw_type = LinktestGoal.BW_CONSTANT, 
00136                                         update_interval = 0.05, max_return_time = 0.225)
00137         test.start()
00138         time.sleep(1.5)
00139         self.assertTrue(test.done, "Expected test to have ended")
00140         self.assertTrue(test.latency.min() > 0.05,
00141                         "Expected min latency to be ~75ms (meaning that there was no interval which had "
00142                         " no restricted packets), instead it was %.2fms"%
00143                         (test.latency.min()))
00144         self.assertTrue(test.loss.min() < 2.0,
00145                         "Expected 0%% loss during all intervals, instead for one interval it was %.2f%%"%
00146                         (test.loss.min()) )
00147 
00148     def test_udp_return(self):
00149         self.tc.set_latency_loss_udp_returnpath(loss = 20.0)
00150 
00151         test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0,
00152                                         sink_ip = "127.0.0.1", sink_port = 12345,
00153                                         bw_type = LinktestGoal.BW_CONSTANT, 
00154                                         update_interval = 0.05, ros_returnpath = False) 
00155         test.start()
00156         time.sleep(1.5)
00157         self.assertTrue(test.done, "Expected test to have ended")
00158         self.assertTrue(test.loss.movavg() > 10.0,
00159                         "Expected loss to be ~20%% on UDP return path instead it was %.2f%%"%
00160                         (test.loss.movavg()))
00161 
00162         test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0,
00163                                         sink_ip = "127.0.0.1", sink_port = 12345,
00164                                         bw_type = LinktestGoal.BW_CONSTANT, 
00165                                         update_interval = 0.05, ros_returnpath = True) 
00166         test.start()
00167         time.sleep(1.5)
00168         self.assertTrue(test.done, "Expected test to have ended")
00169         self.assertTrue(test.loss.movavg() < 2.0,
00170                         "Expected loss to be ~0%% with ROS return path instead it was %.2f%%"%
00171                         (test.loss.movavg()))
00172 
00173     def test_roundtrip(self):
00174         self.tc.set_latency_loss_udp_returnpath(latency = 0.05)
00175         
00176         test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0,
00177                                         sink_ip = "127.0.0.1", sink_port = 12345,
00178                                         bw_type = LinktestGoal.BW_CONSTANT, 
00179                                         update_interval = 0.05, ros_returnpath = False,
00180                                         roundtrip = False) 
00181         test.start()
00182         time.sleep(1.5)
00183         self.assertTrue(test.done, "Expected test to have ended")
00184         self.assertTrue(test.latency.movavg() < 0.01,
00185                         "Expected latency to be < 1ms, instead it was %.2fms"%
00186                         (test.latency.movavg() * 1e3))
00187 
00188         test = self.srcnode.create_test(bw = 2.0*10**6, pktsize = 500, duration = 1.0,
00189                                         sink_ip = "127.0.0.1", sink_port = 12345,
00190                                         bw_type = LinktestGoal.BW_CONSTANT, 
00191                                         update_interval = 0.05, ros_returnpath = False,
00192                                         roundtrip = True) 
00193         test.start()
00194         time.sleep(1.5)
00195         self.assertTrue(test.done, "Expected test to have ended")
00196         self.assertTrue(test.latency.movavg() > 0.03,
00197                         "Expected latency to be ~50ms, instead it was %.2fms"%
00198                         (test.latency.movavg() * 1e3))
00199 
00200 if __name__ == '__main__':
00201     try:
00202         rostest.run('network_monitor_udp', 'parameter_test', ParameterTest)
00203     except KeyboardInterrupt, e:
00204         pass


network_control_tests
Author(s): Catalin Drula
autogenerated on Wed Sep 16 2015 04:38:36