00001 import subprocess 00002 import unittest 00003 00004 class TcPortControl: 00005 def __init__(self, testcase, interface = "lo", port = 12345): 00006 self.testcase = testcase 00007 self.interface = interface 00008 self.port = port 00009 self.devnull = open('/dev/null', 'w') 00010 00011 def reset(self): 00012 subprocess.call(['tc', 'qdisc', 'del', 'dev', self.interface, 'root'], 00013 stdout = self.devnull, stderr = subprocess.STDOUT) 00014 subprocess.call(['tc', 'qdisc', 'del', 'dev', self.interface, 'ingress'], 00015 stdout = self.devnull, stderr = subprocess.STDOUT) 00016 00017 def init(self): 00018 ret = subprocess.call(['tc', 'qdisc', 'add', 'dev', self.interface, 'handle', '1:', 'root', 'htb'], 00019 stdout = self.devnull, stderr = subprocess.STDOUT) 00020 self.testcase.assertEqual(ret, 0, "Setting htb qdisc failed") 00021 ret = subprocess.call(['tc', 'qdisc', 'add', 'dev', self.interface, 'handle', 'ffff:', 'ingress'], 00022 stdout = self.devnull, stderr = subprocess.STDOUT) 00023 self.testcase.assertEqual(ret, 0, "Setting ingress qdisc failed") 00024 ret = subprocess.call(['tc', 'class', 'add', 'dev', self.interface, 'parent', '1:', 'classid', '1:1', 00025 'htb', 'rate', '1000Mbps'], stdout = self.devnull, stderr = subprocess.STDOUT) 00026 self.testcase.assertEqual(ret, 0, "Adding htb class failed") 00027 ret = subprocess.call(['tc', 'filter', 'add', 'dev', self.interface, 'protocol', 'ip', 'prio', '1', 00028 'u32', 'match', 'ip', 'dport', str(self.port), '0xffff', 'flowid', '1:1'], 00029 stdout = self.devnull, stderr = subprocess.STDOUT) 00030 self.testcase.assertEqual(ret, 0, "Adding destination port " + str(self.port) + " filter failed") 00031 00032 def set_rate_limit(self, rate): 00033 ret1 = subprocess.call(['tc', 'qdisc', 'add', 'dev', self.interface, 'parent', '1:1', 00034 'tbf', 'rate', str(rate), 'buffer', str(int(rate*625/1e5)), 00035 'limit', str(int(rate * 1171875/1e8))], 00036 stdout = self.devnull, stderr = subprocess.STDOUT) 00037 ret2 = subprocess.call(['sudo', 'tc', 'qdisc', 'change', 'dev', self.interface, 'parent', '1:1', 00038 'tbf', 'rate', str(rate), 'buffer', str(int(rate*625/1e5)), 00039 'limit', str(int(rate * 1171875/1e8))], 00040 stdout = self.devnull, stderr = subprocess.STDOUT) 00041 self.testcase.assertTrue(ret1 == 0 or ret2 == 0, 00042 "Setting the rate limit failed on loopback interface") 00043 00044 def set_latency_loss(self, latency = 0.0, loss = 0.0): 00045 ret1 = subprocess.call(['tc', 'qdisc', 'add', 'dev', self.interface, 'parent', '1:1', 00046 'netem', 'latency', '%.2fms'%(latency*1000), 'loss', '%.2f%%'%(loss)], 00047 stdout = self.devnull, stderr = subprocess.STDOUT) 00048 ret2 = subprocess.call(['tc', 'qdisc', 'change', 'dev', self.interface, 'parent', '1:1', 00049 'netem', 'latency', '%.2fms'%(latency*1000), 'loss', '%.2f%%'%(loss)], 00050 stdout = self.devnull, stderr = subprocess.STDOUT) 00051 self.testcase.assertTrue(ret1 == 0 or ret2 == 0, 00052 "Setting netem latency and loss parameters failed on loopback interface") 00053 00054 def set_latency_loss_udp_returnpath(self, latency = 0.0, loss = 0.0): 00055 subprocess.call(['modprobe', 'ifb'], 00056 stdout = self.devnull, stderr = subprocess.STDOUT) 00057 subprocess.call(['ip', 'link', 'set', 'dev', 'ifb0', 'up'], 00058 stdout = self.devnull, stderr = subprocess.STDOUT) 00059 subprocess.call(['tc', 'qdisc', 'del', 'dev', 'ifb0', 'root'], 00060 stdout = self.devnull, stderr = subprocess.STDOUT) 00061 subprocess.call(['tc', 'filter', 'add', 'dev', self.interface, 'parent', 'ffff:', 'protocol', 'ip', 00062 'u32', 'match', 'ip', 'sport', str(self.port), '0xffff', 'flowid', '1:1', 00063 'action', 'mirred', 'egress', 'redirect', 'dev', 'ifb0'], 00064 stdout = self.devnull, stderr = subprocess.STDOUT) 00065 00066 ret1 = subprocess.call(['tc', 'qdisc', 'add', 'dev', 'ifb0', 'root', 00067 'netem', 'latency', '%.2fms'%(latency*1000), 'loss', '%.2f%%'%(loss)], 00068 stdout = self.devnull, stderr = subprocess.STDOUT) 00069 ret2 = subprocess.call(['tc', 'qdisc', 'change', 'dev', 'ifb0', 'root', 00070 'netem', 'latency', '%.2fms'%(latency*1000), 'loss', '%.2f%%'%(loss)], 00071 stdout = self.devnull, stderr = subprocess.STDOUT) 00072 self.testcase.assertTrue(ret1 == 0 or ret2 == 0, 00073 "Setting netem latency and loss parameters failed on ifb0 interface")