$search
00001 #! /usr/bin/env python 00002 00003 from subprocess import call, STDOUT 00004 import shlex 00005 import math 00006 00007 import roslib; roslib.load_manifest('network_traffic_control') 00008 import rospy 00009 00010 import dynamic_reconfigure.server 00011 from network_traffic_control.cfg import TrafficControlConfig 00012 00013 DEFAULT_PACKET_SIZE = 1500 00014 PACKET_QUEUE_SIZE = 1 00015 MAX_PACKET_BURST = 5 00016 00017 class TrafficControlManager: 00018 def __init__(self, interface, 00019 interface_ifb = "ifb0", 00020 filter_ingress = "u32 match u32 0 0", 00021 filter_egress = "u32 match u32 0 0"): 00022 self.interface = interface 00023 self.interface_ifb = interface_ifb 00024 self.filter_egress = filter_egress 00025 self.filter_ingress = filter_ingress 00026 self.ifb_not_set_up = True 00027 self.devnull = open('/dev/null', 'w') 00028 self.config = dict() 00029 self.reset_egress() 00030 self.reset_ingress() 00031 00032 def reset_egress(self): 00033 call(shlex.split("tc qdisc del dev " + self.interface + " root"), 00034 stdout = self.devnull, stderr = STDOUT) 00035 self.config['bandwidth_egress'] = 0.0 00036 self.config['latency_egress'] = 0.0 00037 self.config['loss_egress'] = 0.0 00038 self.egress_control = False 00039 00040 def init_egress(self): 00041 # htb qdisc 00042 ret = call(shlex.split("tc qdisc add dev " + self.interface + " handle 1: root htb"), 00043 stdout = self.devnull, stderr = STDOUT) 00044 if ret != 0: 00045 self.config['status'] = "FAIL" 00046 self.config['errmsg'] = "Failed to add htb qdisc at root on " + self.interface 00047 self.reset_egress() 00048 return 00049 # htb class 00050 ret = call(shlex.split("tc class add dev " + self.interface + " parent 1: classid 1:1" 00051 " htb rate 10000Mbps"), 00052 stdout = self.devnull, stderr = STDOUT) 00053 if ret != 0: 00054 self.config['status'] = "FAIL" 00055 self.config['errmsg'] = "Failed to add htb class on " + self.interface 00056 self.reset_egress() 00057 return 00058 # filter 00059 ret = call(shlex.split("tc filter add dev " + self.interface + " protocol ip prio 1 " + 00060 self.filter_egress + " flowid 1:1"), 00061 stdout = self.devnull, stderr = STDOUT) 00062 if ret != 0: 00063 self.config['status'] = "FAIL" 00064 self.config['errmsg'] = "Failed to set egress filter on " + self.interface 00065 self.reset_egress() 00066 return 00067 # tbf qdisc 00068 ret = call(shlex.split("tc qdisc add dev " + self.interface + " parent 1:1 handle 2:1 " 00069 "tbf rate 10000mbit buffer 500000 limit 100000"), 00070 stdout = self.devnull, stderr = STDOUT) 00071 if ret != 0: 00072 self.config['status'] = "FAIL" 00073 self.config['errmsg'] = "Failed to add tbf qdisc on " + self.interface 00074 self.reset_egress() 00075 return 00076 # enable egress control 00077 self.egress_control = True 00078 00079 def set_rules(self, config, direction): 00080 if direction == "egress": 00081 latency_new = config['latency_egress'] 00082 loss_new = config['loss_egress'] 00083 bandwidth_new = config['bandwidth_egress'] 00084 00085 latency_old = self.config['latency_egress'] 00086 loss_old = self.config['loss_egress'] 00087 bandwidth_old = self.config['bandwidth_egress'] 00088 00089 interface = self.interface 00090 elif direction == "ingress": 00091 latency_new = config['latency_ingress'] 00092 loss_new = config['loss_ingress'] 00093 bandwidth_new = config['bandwidth_ingress'] 00094 00095 latency_old = self.config['latency_ingress'] 00096 loss_old = self.config['loss_ingress'] 00097 bandwidth_old = self.config['bandwidth_ingress'] 00098 00099 interface = self.interface_ifb 00100 else: 00101 self.config['status'] = "FAIL" 00102 self.config['errmsg'] = "Unknown direction: " + direction 00103 00104 latency_loss_disabled = (latency_new == 0.0 and loss_new == 0.0) and \ 00105 (latency_old != 0.0 or loss_old != 0.0) 00106 latency_loss_enabled = (latency_new != 0.0 or loss_new != 0.0) and \ 00107 (latency_old == 0.0 and loss_old == 0.0) 00108 00109 if bandwidth_new != bandwidth_old or \ 00110 config['packet_size'] != self.config['packet_size'] or latency_loss_disabled: 00111 if bandwidth_new > 0.0: 00112 bw_kbits = bandwidth_new/1e3 00113 bw_limit = (config['packet_size'] + 100) * PACKET_QUEUE_SIZE 00114 bw_buffer = (config['packet_size'] + 100) * MAX_PACKET_BURST 00115 else: 00116 bw_kbits = 10000 * 1e3 # 10Gbits 00117 bw_limit = 100000 00118 bw_buffer = 500000 00119 00120 ret = call(shlex.split("tc qdisc change dev " + interface + " handle 2:1 tbf " 00121 "rate %.2fkbit buffer %d limit %d"%(bw_kbits, bw_buffer, bw_limit)), 00122 stdout = self.devnull, stderr = STDOUT) 00123 00124 if ret != 0: 00125 self.config['status'] = "FAIL" 00126 self.config['errmsg'] = self.config['errmsg'] + \ 00127 "Failed to change tbf (bandwidth) rule on " + interface 00128 if direction == "egress": 00129 self.reset_egress() 00130 else: 00131 self.reset_ingress() 00132 return 00133 00134 netem_action = "add" 00135 else: 00136 if not latency_loss_enabled: 00137 netem_action = "change" 00138 else: 00139 netem_action = "add" 00140 00141 # touch the netem rule (add or change it) only if latency or loss are not zero 00142 # and either they have changed since last update or the tbf rule has changed 00143 if (latency_new != 0.0 or loss_new != 0.0) and \ 00144 (netem_action == "add" or latency_new != latency_old or loss_new != loss_old): 00145 latency_ms = 1e3 * latency_new 00146 loss = loss_new 00147 if bandwidth_new == 0.0: 00148 limit = 1000 00149 else: 00150 limit = math.ceil(1.0 + 00151 latency_new / (8.0 * float(config['packet_size'])/bandwidth_new)) 00152 limit = int(limit) 00153 00154 ret = call(shlex.split("tc qdisc " + netem_action + " dev " + interface + 00155 " parent 2:1 handle 3:1 " 00156 "netem latency %.2fms loss %.2f%% limit %d"%(latency_ms, loss, limit)), 00157 stdout = self.devnull, stderr = STDOUT) 00158 if ret != 0: 00159 self.config['status'] = "FAIL" 00160 self.config['errmsg'] = "Failed to " + netem_action + " netem qdisc on " + interface 00161 if direction == "egress": 00162 self.reset_egress() 00163 else: 00164 self.reset_ingress() 00165 return 00166 00167 if direction == "egress": 00168 self.config['bandwidth_egress'] = bandwidth_new 00169 self.config['latency_egress'] = latency_new 00170 self.config['loss_egress'] = loss_new 00171 else: 00172 self.config['bandwidth_ingress'] = bandwidth_new 00173 self.config['latency_ingress'] = latency_new 00174 self.config['loss_ingress'] = loss_new 00175 00176 def reset_ingress(self): 00177 ret = call(shlex.split("tc qdisc del dev " + self.interface + " ingress"), 00178 stdout = self.devnull, stderr = STDOUT) 00179 ret = call(shlex.split("tc qdisc del dev " + self.interface_ifb + " root"), 00180 stdout = self.devnull, stderr = STDOUT) 00181 self.config['bandwidth_ingress'] = 0.0 00182 self.config['loss_ingress'] = 0.0 00183 self.config['latency_ingress'] = 0.0 00184 self.ingress_control = False 00185 00186 def init_ingress(self): 00187 # set up ifb device 00188 if self.ifb_not_set_up: 00189 call(shlex.split("modprobe ifb"), 00190 stdout = self.devnull, stderr = STDOUT) 00191 call(shlex.split("ip link set dev " + self.interface_ifb + " up"), 00192 stdout = self.devnull, stderr = STDOUT) 00193 self.ifb_not_set_up = False 00194 # add ingress qdisc 00195 ret = call(shlex.split("tc qdisc add dev " + self.interface + " handle ffff: ingress"), 00196 stdout = self.devnull, stderr = STDOUT) 00197 if ret != 0: 00198 self.config['status'] = "FAIL" 00199 self.config['errmsg'] = self.config['errmsg'] + \ 00200 "Failed to add ingress qdisc at root on " + self.interface 00201 self.reset_ingress() 00202 return 00203 # add redirect filter 00204 ret = call(shlex.split("tc filter add dev " + self.interface + " parent ffff: protocol ip " + 00205 self.filter_ingress + 00206 " action mirred egress redirect dev " + self.interface_ifb), 00207 stdout = self.devnull, stderr = STDOUT) 00208 if ret != 0: 00209 self.config['status'] = "FAIL" 00210 self.config['errmsg'] = self.config['errmsg'] + \ 00211 "Failed to add redirect filter on ingress for " + self.interface 00212 self.reset_ingress() 00213 return 00214 # add tbf root rule on ifb-device 00215 ret = call(shlex.split("tc qdisc add dev " + self.interface_ifb + " root handle 2:1 " + 00216 "tbf rate 10000mbit buffer 500000 limit 100000"), 00217 stdout = self.devnull, stderr = STDOUT) 00218 if ret != 0: 00219 self.config['status'] = "FAIL" 00220 self.config['errmsg'] = self.config['errmsg'] + \ 00221 "Failed to add tbf qdisc at root on " + self.interface_ifb 00222 self.reset_ingress() 00223 return 00224 # enable ingress control 00225 self.ingress_control = True 00226 00227 def reconfigure(self, config, level): 00228 self.config['status'] = 'OK' 00229 self.config['errmsg'] = '' 00230 00231 if 'packet_size' not in self.config: 00232 self.config['packet_size'] = config['packet_size'] 00233 00234 if 1 & level or 4 & level: 00235 if self.egress_control and config['bandwidth_egress'] == 0.0 and \ 00236 config['latency_egress'] == 0.0 and config['loss_egress'] == 0.0: 00237 self.reset_egress() 00238 00239 if not self.egress_control and (config['bandwidth_egress'] != 0.0 or \ 00240 config['latency_egress'] != 0.0 or config['loss_egress'] != 0.0): 00241 self.init_egress() 00242 00243 if self.egress_control: 00244 self.set_rules(config, "egress") 00245 00246 if 2 & level or 4 & level: 00247 if self.ingress_control and config['bandwidth_ingress'] == 0.0 and \ 00248 config['latency_ingress'] == 0.0 and config['loss_ingress'] == 0.0: 00249 self.reset_ingress() 00250 00251 if not self.ingress_control and (config['bandwidth_ingress'] != 0.0 or \ 00252 config['latency_ingress'] != 0.0 or config['loss_ingress'] != 0.0): 00253 self.init_ingress() 00254 00255 if self.ingress_control: 00256 self.set_rules(config, "ingress") 00257 00258 self.config['packet_size'] = config['packet_size'] 00259 00260 return self.config 00261 00262 00263 if __name__ == "__main__": 00264 rospy.init_node("traffic_control_node") 00265 00266 interface = rospy.get_param("~interface") 00267 args = dict() 00268 try: 00269 args["interface_ifb"] = rospy.get_param("~interface_ifb") 00270 except KeyError: 00271 pass 00272 try: 00273 args["filter_egress"] = rospy.get_param("~filter_egress") 00274 except KeyError: 00275 pass 00276 try: 00277 args["filter_ingress"] = rospy.get_param("~filter_ingress") 00278 except KeyError: 00279 pass 00280 00281 tc_mgr = TrafficControlManager(interface, **args) 00282 00283 try: 00284 dynamic_reconfigure.server.Server(TrafficControlConfig, tc_mgr.reconfigure) 00285 rospy.spin() 00286 finally: 00287 tc_mgr.reset_egress() 00288 tc_mgr.reset_ingress()