$search
00001 #! /usr/bin/env python 00002 00003 import sys 00004 import time 00005 import subprocess 00006 import unittest 00007 00008 import roslib; roslib.load_manifest('network_monitor_udp') 00009 import rospy 00010 import rostest 00011 00012 from network_monitor_udp.linktest import UdpmonsourceHandle 00013 from network_monitor_udp.linktest import LinkTest 00014 from network_monitor_udp.msg import LinktestGoal 00015 00016 class BasicTest(unittest.TestCase): 00017 def __init__(self, *args): 00018 super(BasicTest, self).__init__(*args) 00019 rospy.init_node('network_monitor_udp_test') 00020 self.srcnode = UdpmonsourceHandle('performance_test') 00021 00022 def setUp(self): 00023 self.srcnode.cancel_all_tests() 00024 00025 def test_finite_duration(self): 00026 scheduled_duration = 3.0 # seconds 00027 00028 test = self.srcnode.create_test("finite_duration_test", 00029 sink_ip = "127.0.0.1", sink_port = 12345, 00030 duration = scheduled_duration) 00031 start_time = time.time() 00032 test_duration = 0.0 00033 test.start() 00034 while not test.done and test_duration < scheduled_duration + 2.0: 00035 time.sleep(0.2) 00036 test_duration = time.time() - start_time 00037 00038 self.assertTrue( test.done, "Test did not end within time limit. " + 00039 "Scheduled duration was %.2fsec while elapsed duration so far is %.2fsec"% 00040 (scheduled_duration, test_duration) ) 00041 00042 self.assertTrue( time.time() - start_time > scheduled_duration - 0.5, 00043 "Test ended too fast, it was scheduled for %.2fsec and it lasted for only %.2fsec"% 00044 (scheduled_duration, time.time() - start_time) ) 00045 00046 def run_periodic_feedback_test(self, update_interval, test_duration = 5.0): 00047 self.last_feedback_time = None 00048 self.feedback_interval_too_small = False 00049 self.feedback_interval_too_large = False 00050 self.feedback_interval_too_small_value = 0.0 00051 self.feedback_interval_too_large_value = 0.0 00052 self.feedback_calls = 0 00053 self.update_interval = update_interval 00054 00055 test = self.srcnode.create_test(sink_ip = "127.0.0.1", sink_port = 12345, 00056 duration = test_duration, update_interval = update_interval) 00057 test.set_custom_feedback_handler(self.feedback_period_test_handler) 00058 test.start() 00059 00060 while not test.done: 00061 time.sleep(1.0) 00062 00063 self.assertFalse(self.feedback_interval_too_large, 00064 "Interval between two feedback calls was too large: expected (%.2f sec) was (%.2f sec)"% 00065 (self.update_interval, self.feedback_interval_too_large_value) ) 00066 00067 self.assertFalse(self.feedback_interval_too_small, 00068 "Interval between two feedback calls was too small: expected (%.2f sec) was (%.2f sec)"% 00069 (self.update_interval, self.feedback_interval_too_small_value) ) 00070 00071 conservative_expected_feedback_calls = int((test_duration - 1.0)/self.update_interval) 00072 self.assertTrue(self.feedback_calls >= conservative_expected_feedback_calls, 00073 "Expected at least (%d) feedback calls, but got only (%d)"% 00074 (conservative_expected_feedback_calls, self.feedback_calls)) 00075 00076 def test_feedback_period(self): 00077 self.run_periodic_feedback_test(0.5, 5.0) 00078 self.run_periodic_feedback_test(0.2, 3.0) 00079 self.run_periodic_feedback_test(0.05, 3.0) 00080 00081 def feedback_period_test_handler(self, gh, feedback): 00082 self.feedback_calls += 1 00083 curr_time = time.time() 00084 if self.last_feedback_time is not None: 00085 if curr_time - self.last_feedback_time > self.update_interval * 1.5: 00086 self.feedback_interval_too_large = True 00087 self.feedback_interval_too_large_value = curr_time - self.last_feedback_time 00088 elif curr_time - self.last_feedback_time < self.update_interval * 0.5: 00089 self.feedback_interval_too_small = True 00090 self.feedback_interval_too_small_value = curr_time - self.last_feedback_time 00091 self.last_feedback_time = curr_time 00092 00093 def test_stop_test(self): 00094 test = self.srcnode.create_test("finite_duration_test", 00095 sink_ip = "127.0.0.1", sink_port = 12345, 00096 duration = 5.0, bw = 1e6 ) 00097 test.start() 00098 time.sleep(1.0) 00099 test.stop() 00100 time.sleep(1.0) 00101 00102 self.assertTrue(test.done, 00103 "Test should have been stopped") 00104 self.assertTrue(test.overall_bandwidth > 0.6e6, 00105 "Expected overall bandwidth to be ~1Mbit/s, instead it was %.2fMbit/s"% 00106 (test.overall_bandwidth/1e6)) 00107 00108 00109 if __name__ == '__main__': 00110 try: 00111 rostest.run('network_monitor_udp', 'blahblah', BasicTest) 00112 except KeyboardInterrupt, e: 00113 pass