Go to the documentation of this file.00001
00002
00003 import sys
00004 import time
00005 import subprocess
00006 import unittest
00007
00008 import roslib; roslib.load_manifest('network_monitor_udp')
00009 import rospy
00010 import rostest
00011
00012 from network_monitor_udp.linktest import UdpmonsourceHandle
00013 from network_monitor_udp.linktest import LinkTest
00014 from network_monitor_udp.msg import LinktestGoal
00015
00016 class BasicTest(unittest.TestCase):
00017 def __init__(self, *args):
00018 super(BasicTest, self).__init__(*args)
00019 rospy.init_node('network_monitor_udp_test')
00020 self.srcnode = UdpmonsourceHandle('performance_test')
00021
00022 def setUp(self):
00023 self.srcnode.cancel_all_tests()
00024
00025 def test_finite_duration(self):
00026 scheduled_duration = 3.0
00027
00028 test = self.srcnode.create_test("finite_duration_test",
00029 sink_ip = "127.0.0.1", sink_port = 12345,
00030 duration = scheduled_duration)
00031 start_time = time.time()
00032 test_duration = 0.0
00033 test.start()
00034 while not test.done and test_duration < scheduled_duration + 2.0:
00035 time.sleep(0.2)
00036 test_duration = time.time() - start_time
00037
00038 self.assertTrue( test.done, "Test did not end within time limit. " +
00039 "Scheduled duration was %.2fsec while elapsed duration so far is %.2fsec"%
00040 (scheduled_duration, test_duration) )
00041
00042 self.assertTrue( time.time() - start_time > scheduled_duration - 0.5,
00043 "Test ended too fast, it was scheduled for %.2fsec and it lasted for only %.2fsec"%
00044 (scheduled_duration, time.time() - start_time) )
00045
00046 def run_periodic_feedback_test(self, update_interval, test_duration = 5.0):
00047 self.last_feedback_time = None
00048 self.feedback_interval_too_small = False
00049 self.feedback_interval_too_large = False
00050 self.feedback_interval_too_small_value = 0.0
00051 self.feedback_interval_too_large_value = 0.0
00052 self.feedback_calls = 0
00053 self.update_interval = update_interval
00054
00055 test = self.srcnode.create_test(sink_ip = "127.0.0.1", sink_port = 12345,
00056 duration = test_duration, update_interval = update_interval)
00057 test.set_custom_feedback_handler(self.feedback_period_test_handler)
00058 test.start()
00059
00060 while not test.done:
00061 time.sleep(1.0)
00062
00063 self.assertFalse(self.feedback_interval_too_large,
00064 "Interval between two feedback calls was too large: expected (%.2f sec) was (%.2f sec)"%
00065 (self.update_interval, self.feedback_interval_too_large_value) )
00066
00067 self.assertFalse(self.feedback_interval_too_small,
00068 "Interval between two feedback calls was too small: expected (%.2f sec) was (%.2f sec)"%
00069 (self.update_interval, self.feedback_interval_too_small_value) )
00070
00071 conservative_expected_feedback_calls = int((test_duration - 1.0)/self.update_interval)
00072 self.assertTrue(self.feedback_calls >= conservative_expected_feedback_calls,
00073 "Expected at least (%d) feedback calls, but got only (%d)"%
00074 (conservative_expected_feedback_calls, self.feedback_calls))
00075
00076 def test_feedback_period(self):
00077 self.run_periodic_feedback_test(0.5, 5.0)
00078 self.run_periodic_feedback_test(0.2, 3.0)
00079 self.run_periodic_feedback_test(0.05, 3.0)
00080
00081 def feedback_period_test_handler(self, gh, feedback):
00082 self.feedback_calls += 1
00083 curr_time = time.time()
00084 if self.last_feedback_time is not None:
00085 if curr_time - self.last_feedback_time > self.update_interval * 1.5:
00086 self.feedback_interval_too_large = True
00087 self.feedback_interval_too_large_value = curr_time - self.last_feedback_time
00088 elif curr_time - self.last_feedback_time < self.update_interval * 0.5:
00089 self.feedback_interval_too_small = True
00090 self.feedback_interval_too_small_value = curr_time - self.last_feedback_time
00091 self.last_feedback_time = curr_time
00092
00093 def test_stop_test(self):
00094 test = self.srcnode.create_test("finite_duration_test",
00095 sink_ip = "127.0.0.1", sink_port = 12345,
00096 duration = 5.0, bw = 1e6 )
00097 test.start()
00098 time.sleep(1.0)
00099 test.stop()
00100 time.sleep(1.0)
00101
00102 self.assertTrue(test.done,
00103 "Test should have been stopped")
00104 self.assertTrue(test.overall_bandwidth > 0.6e6,
00105 "Expected overall bandwidth to be ~1Mbit/s, instead it was %.2fMbit/s"%
00106 (test.overall_bandwidth/1e6))
00107
00108
00109 if __name__ == '__main__':
00110 try:
00111 rostest.run('network_monitor_udp', 'blahblah', BasicTest)
00112 except KeyboardInterrupt, e:
00113 pass