00001
00002
00003 import sys
00004 import time
00005 import subprocess
00006 import unittest
00007
00008 import roslib; roslib.load_manifest('network_monitor_udp')
00009 import rospy
00010 import rostest
00011
00012 from network_monitor_udp.linktest import UdpmonsourceHandle
00013 from network_monitor_udp.linktest import LinkTest
00014 from network_monitor_udp.msg import LinktestGoal
00015
00016 from tc_port_control import TcPortControl
00017
00018 class MultipleEndsTest(unittest.TestCase):
00019 def __init__(self, *args):
00020 super(MultipleEndsTest, self).__init__(*args)
00021 rospy.init_node('network_monitor_udp_test')
00022 self.srcnode = UdpmonsourceHandle('performance_test')
00023 self.tc = TcPortControl(self)
00024 self.tc.reset()
00025
00026 def setUp(self):
00027 self.srcnode.cancel_all_tests()
00028 self.tc.init()
00029
00030 def tearDown(self):
00031 self.tc.reset()
00032
00033 def test_multiple_sources_single_sink(self):
00034 self.tc.set_rate_limit(1e6)
00035
00036 test1 = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 5.0,
00037 sink_ip = "127.0.0.1", sink_port = 12345,
00038 bw_type = LinktestGoal.BW_CONSTANT,
00039 update_interval = 0.2)
00040 test2 = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 10.0,
00041 sink_ip = "127.0.0.1", sink_port = 12345,
00042 bw_type = LinktestGoal.BW_CONSTANT,
00043 update_interval = 0.2)
00044 test3 = self.srcnode.create_test(bw = 1.0*10**6, pktsize = 1500, duration = 15.0,
00045 sink_ip = "127.0.0.1", sink_port = 12345,
00046 bw_type = LinktestGoal.BW_CONSTANT,
00047 update_interval = 0.2)
00048 test1.start()
00049 test2.start()
00050 test3.start()
00051
00052 time.sleep(3.0)
00053
00054 bwsum = test1.bandwidth.movavg(5) + test2.bandwidth.movavg(5) + test3.bandwidth.movavg(5)
00055 self.assertTrue(bwsum > 0.8e6 and bwsum < 1.2e6,
00056 "Expected the combined bandwidth of three tests to be ~1Mbit/s, instead it was %.2fMbit/s"%
00057 (bwsum))
00058
00059 time.sleep(2.5)
00060
00061 self.assertTrue(test1.done,
00062 "Expected test1 to have finished at time = 5.5s")
00063 self.assertFalse(test2.done,
00064 "Expected test2 to be still running at time = 5.5s")
00065 self.assertFalse(test3.done,
00066 "Expected test3 to be still running at time = 5.5s")
00067
00068 time.sleep(2.5)
00069
00070 bwsum = test2.bandwidth.movavg(5) + test3.bandwidth.movavg(5)
00071 self.assertTrue(bwsum > 0.8e6 and bwsum < 1.2e6,
00072 "Expected the combined bandwidth of two tests to be ~1Mbit/s, instead it was %.2fMbit/s"%
00073 (bwsum))
00074
00075 time.sleep(2.5)
00076
00077 self.assertTrue(test2.done,
00078 "Expected test2 to have finished at time = 10.5s")
00079 self.assertFalse(test3.done,
00080 "Expected test3 to be still running at time = 10.5s")
00081
00082 time.sleep(2.5)
00083
00084 self.assertTrue(test3.bandwidth.movavg(5) > 0.80e6 and test3.bandwidth.movavg(5) < 1.20e6,
00085 "Expected test3 bandwidth to be ~1Mbit/s (with 1 test running concurrently)" +
00086 ", instead it was %.2fMbit/s"%
00087 (test3.bandwidth.movavg(5)/1e6))
00088
00089 time.sleep(2.5)
00090
00091 self.assertTrue(test2.done,
00092 "Expected test2 to have finished at time = 10.5s")
00093
00094 self.assertTrue(test3.latency.duration() > test2.latency.duration() and
00095 test2.latency.duration() > test1.latency.duration(),
00096 "Expected test3 duration (%.2fs) to be greater than test2 (%.2fs)"
00097 " and still greater than test1 (%.2fs)"%
00098 (test3.latency.duration(), test2.latency.duration(), test1.latency.duration()))
00099
00100 def test_multiple_sinks(self):
00101 self.tc.set_rate_limit(1e6)
00102
00103 test1 = self.srcnode.create_test(bw = 5.0*10**6, pktsize = 1500, duration = 3.0,
00104 sink_ip = "127.0.0.1", sink_port = 12345,
00105 bw_type = LinktestGoal.BW_CONSTANT,
00106 update_interval = 0.2)
00107 test2 = self.srcnode.create_test(bw = 5.0*10**6, pktsize = 1500, duration = 3.0,
00108 sink_ip = "127.0.0.1", sink_port = 12346,
00109 bw_type = LinktestGoal.BW_CONSTANT,
00110 update_interval = 0.2)
00111 test1.start()
00112 test2.start()
00113 time.sleep(3.5)
00114
00115 self.assertTrue(test1.done, "Expected test2 to have finished at time = 3.5s")
00116 self.assertTrue(test2.done, "Expected test2 to have finished at time = 3.5s")
00117
00118 self.assertTrue(test2.overall_bandwidth > 4 * test1.overall_bandwidth,
00119 "Expected test2 bw (%.2fMbit/s) to be at least 4x larger"
00120 " than test1 bw (%.2fMbit/s)"%
00121 (test2.overall_bandwidth/1e6, test1.overall_bandwidth/1e6))
00122
00123 if __name__ == '__main__':
00124 try:
00125 rostest.run('network_monitor_udp', 'multiple_ends_test', MultipleEndsTest)
00126 except KeyboardInterrupt, e:
00127 pass