basic_test.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 import sys
00004 import time
00005 import subprocess
00006 import unittest
00007 
00008 import roslib; roslib.load_manifest('network_monitor_udp')
00009 import rospy
00010 import rostest 
00011 
00012 from network_monitor_udp.linktest import UdpmonsourceHandle
00013 from network_monitor_udp.linktest import LinkTest
00014 from network_monitor_udp.msg import LinktestGoal
00015 
00016 class BasicTest(unittest.TestCase):
00017     def __init__(self, *args):
00018         super(BasicTest, self).__init__(*args)
00019         rospy.init_node('network_monitor_udp_test')
00020         self.srcnode = UdpmonsourceHandle('performance_test')
00021 
00022     def setUp(self):
00023         self.srcnode.cancel_all_tests()
00024         
00025     def test_finite_duration(self):
00026         scheduled_duration = 3.0 # seconds
00027 
00028         test = self.srcnode.create_test("finite_duration_test", 
00029                                         sink_ip = "127.0.0.1", sink_port = 12345, 
00030                                         duration = scheduled_duration)
00031         start_time = time.time()
00032         test_duration = 0.0 
00033         test.start()
00034         while not test.done and test_duration < scheduled_duration + 2.0:
00035             time.sleep(0.2)
00036             test_duration = time.time() - start_time
00037 
00038         self.assertTrue( test.done, "Test did not end within time limit. " + 
00039                     "Scheduled duration was %.2fsec while elapsed duration so far is %.2fsec"% 
00040                     (scheduled_duration, test_duration) )
00041         
00042         self.assertTrue( time.time() - start_time > scheduled_duration - 0.5, 
00043                        "Test ended too fast, it was scheduled for %.2fsec and it lasted for only  %.2fsec"%
00044                        (scheduled_duration, time.time() - start_time) )
00045 
00046     def run_periodic_feedback_test(self, update_interval, test_duration = 5.0):        
00047         self.last_feedback_time = None
00048         self.feedback_interval_too_small = False
00049         self.feedback_interval_too_large = False
00050         self.feedback_interval_too_small_value = 0.0
00051         self.feedback_interval_too_large_value = 0.0
00052         self.feedback_calls = 0
00053         self.update_interval = update_interval
00054 
00055         test = self.srcnode.create_test(sink_ip = "127.0.0.1", sink_port = 12345,
00056                                         duration = test_duration, update_interval = update_interval)
00057         test.set_custom_feedback_handler(self.feedback_period_test_handler) 
00058         test.start()
00059 
00060         while not test.done:
00061             time.sleep(1.0)
00062 
00063         self.assertFalse(self.feedback_interval_too_large, 
00064                          "Interval between two feedback calls was too large: expected (%.2f sec) was (%.2f sec)"%
00065                          (self.update_interval, self.feedback_interval_too_large_value) )
00066 
00067         self.assertFalse(self.feedback_interval_too_small, 
00068                         "Interval between two feedback calls was too small: expected (%.2f sec) was (%.2f sec)"%
00069                          (self.update_interval, self.feedback_interval_too_small_value) )
00070         
00071         conservative_expected_feedback_calls = int((test_duration - 1.0)/self.update_interval)
00072         self.assertTrue(self.feedback_calls >= conservative_expected_feedback_calls,
00073                         "Expected at least (%d) feedback calls, but got only (%d)"%
00074                         (conservative_expected_feedback_calls, self.feedback_calls))
00075 
00076     def test_feedback_period(self):        
00077         self.run_periodic_feedback_test(0.5, 5.0)
00078         self.run_periodic_feedback_test(0.2, 3.0)
00079         self.run_periodic_feedback_test(0.05, 3.0)
00080 
00081     def feedback_period_test_handler(self, gh, feedback):
00082         self.feedback_calls += 1
00083         curr_time = time.time()
00084         if self.last_feedback_time is not None:
00085             if curr_time - self.last_feedback_time > self.update_interval * 1.5:
00086                 self.feedback_interval_too_large = True
00087                 self.feedback_interval_too_large_value = curr_time - self.last_feedback_time
00088             elif curr_time - self.last_feedback_time < self.update_interval * 0.5:
00089                 self.feedback_interval_too_small = True
00090                 self.feedback_interval_too_small_value = curr_time - self.last_feedback_time
00091         self.last_feedback_time = curr_time
00092 
00093     def test_stop_test(self):
00094         test = self.srcnode.create_test("finite_duration_test", 
00095                                         sink_ip = "127.0.0.1", sink_port = 12345, 
00096                                         duration = 5.0, bw = 1e6 )
00097         test.start()
00098         time.sleep(1.0)
00099         test.stop()
00100         time.sleep(1.0)
00101         
00102         self.assertTrue(test.done,
00103                         "Test should have been stopped")
00104         self.assertTrue(test.overall_bandwidth > 0.6e6,
00105                         "Expected overall bandwidth to be ~1Mbit/s, instead it was %.2fMbit/s"%
00106                         (test.overall_bandwidth/1e6))
00107         
00108 
00109 if __name__ == '__main__':
00110     try:
00111         rostest.run('network_monitor_udp', 'blahblah', BasicTest)
00112     except KeyboardInterrupt, e:
00113         pass


network_control_tests
Author(s): Catalin Drula
autogenerated on Wed Sep 16 2015 04:38:36