traffic_control_node.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 from subprocess import call, STDOUT
00004 import shlex 
00005 import math
00006 
00007 import roslib; roslib.load_manifest('network_traffic_control')
00008 import rospy
00009 
00010 import dynamic_reconfigure.server
00011 from network_traffic_control.cfg import TrafficControlConfig
00012 
00013 DEFAULT_PACKET_SIZE = 1500
00014 PACKET_QUEUE_SIZE = 1
00015 MAX_PACKET_BURST = 5
00016 
00017 class TrafficControlManager:
00018     def __init__(self, interface, 
00019                  interface_ifb = "ifb0", 
00020                  filter_ingress = "u32 match u32 0 0",
00021                  filter_egress = "u32 match u32 0 0"):
00022        self.interface = interface
00023        self.interface_ifb = interface_ifb
00024        self.filter_egress = filter_egress
00025        self.filter_ingress = filter_ingress
00026        self.ifb_not_set_up = True
00027        self.devnull = open('/dev/null', 'w')
00028        self.config = dict()
00029        self.reset_egress()
00030        self.reset_ingress()
00031 
00032     def reset_egress(self):
00033         call(shlex.split("tc qdisc del dev " + self.interface + " root"),
00034              stdout = self.devnull, stderr = STDOUT)
00035         self.config['bandwidth_egress'] = 0.0
00036         self.config['latency_egress'] = 0.0
00037         self.config['loss_egress'] = 0.0
00038         self.egress_control = False
00039 
00040     def init_egress(self):
00041         # htb qdisc
00042         ret = call(shlex.split("tc qdisc add dev " + self.interface + " handle 1: root htb"), 
00043                    stdout = self.devnull, stderr = STDOUT)
00044         if ret != 0:
00045             self.config['status'] = "FAIL"
00046             self.config['errmsg'] = "Failed to add htb qdisc at root on " + self.interface
00047             self.reset_egress()
00048             return 
00049         # htb class
00050         ret = call(shlex.split("tc class add dev " + self.interface + " parent 1: classid 1:1"
00051                                " htb rate 10000Mbps"), 
00052                    stdout = self.devnull, stderr = STDOUT)
00053         if ret != 0:
00054             self.config['status'] = "FAIL"
00055             self.config['errmsg'] = "Failed to add htb class on " + self.interface
00056             self.reset_egress()
00057             return 
00058         # filter
00059         ret = call(shlex.split("tc filter add dev " + self.interface + " protocol ip prio 1 " +
00060                                self.filter_egress + " flowid 1:1"),
00061                    stdout = self.devnull, stderr = STDOUT)
00062         if ret != 0:
00063             self.config['status'] = "FAIL"
00064             self.config['errmsg'] = "Failed to set egress filter on " + self.interface
00065             self.reset_egress()
00066             return 
00067         # tbf qdisc
00068         ret = call(shlex.split("tc qdisc add dev " + self.interface + " parent 1:1 handle 2:1 "  
00069                                "tbf rate 10000mbit buffer 500000 limit 100000"),
00070                    stdout = self.devnull, stderr = STDOUT)
00071         if ret != 0:
00072             self.config['status'] = "FAIL"
00073             self.config['errmsg'] = "Failed to add tbf qdisc on " + self.interface
00074             self.reset_egress()
00075             return 
00076         # enable egress control
00077         self.egress_control = True
00078 
00079     def set_rules(self, config, direction):
00080         if direction == "egress":
00081             latency_new = config['latency_egress']
00082             loss_new = config['loss_egress']
00083             bandwidth_new = config['bandwidth_egress']
00084             
00085             latency_old = self.config['latency_egress']
00086             loss_old = self.config['loss_egress']
00087             bandwidth_old = self.config['bandwidth_egress']
00088             
00089             interface = self.interface
00090         elif direction == "ingress":
00091             latency_new = config['latency_ingress']
00092             loss_new = config['loss_ingress']
00093             bandwidth_new = config['bandwidth_ingress']
00094             
00095             latency_old = self.config['latency_ingress']
00096             loss_old = self.config['loss_ingress']
00097             bandwidth_old = self.config['bandwidth_ingress']
00098 
00099             interface = self.interface_ifb
00100         else:
00101             self.config['status'] = "FAIL"
00102             self.config['errmsg'] = "Unknown direction: " + direction
00103 
00104         latency_loss_disabled = (latency_new == 0.0 and loss_new == 0.0) and \
00105             (latency_old != 0.0 or loss_old != 0.0) 
00106         latency_loss_enabled = (latency_new != 0.0 or loss_new != 0.0) and \
00107             (latency_old == 0.0 and loss_old == 0.0) 
00108 
00109         if bandwidth_new != bandwidth_old or \
00110                 config['packet_size'] != self.config['packet_size'] or latency_loss_disabled:
00111             if bandwidth_new > 0.0:
00112                 bw_kbits = bandwidth_new/1e3
00113                 bw_limit = (config['packet_size'] + 100) * PACKET_QUEUE_SIZE 
00114                 bw_buffer = (config['packet_size'] + 100) * MAX_PACKET_BURST
00115             else:
00116                 bw_kbits = 10000 * 1e3 # 10Gbits
00117                 bw_limit = 100000
00118                 bw_buffer = 500000
00119 
00120             ret = call(shlex.split("tc qdisc change dev " + interface + " handle 2:1 tbf "  
00121                                    "rate %.2fkbit buffer %d limit %d"%(bw_kbits, bw_buffer, bw_limit)),                   
00122                        stdout = self.devnull, stderr = STDOUT)
00123 
00124             if ret != 0:
00125                 self.config['status'] = "FAIL"
00126                 self.config['errmsg'] = self.config['errmsg'] + \
00127                     "Failed to change tbf (bandwidth) rule on " + interface
00128                 if direction == "egress":
00129                     self.reset_egress()
00130                 else:
00131                     self.reset_ingress()
00132                 return 
00133 
00134             netem_action = "add"
00135         else:
00136             if not latency_loss_enabled:
00137                 netem_action = "change"
00138             else:
00139                 netem_action = "add"
00140 
00141         # touch the netem rule (add or change it) only if latency or loss are not zero
00142         # and either they have changed since last update or the tbf rule has changed
00143         if (latency_new != 0.0 or loss_new != 0.0) and \
00144                 (netem_action == "add" or latency_new != latency_old or loss_new != loss_old):
00145             latency_ms = 1e3 * latency_new
00146             loss = loss_new
00147             if bandwidth_new == 0.0:
00148                 limit = 1000
00149             else:
00150                 limit = math.ceil(1.0 + 
00151                                   latency_new / (8.0 * float(config['packet_size'])/bandwidth_new))
00152                 limit = int(limit)
00153 
00154             ret = call(shlex.split("tc qdisc " + netem_action + " dev " +  interface + 
00155                                    " parent 2:1 handle 3:1 "
00156                                    "netem latency %.2fms loss %.2f%% limit %d"%(latency_ms, loss, limit)),
00157                        stdout = self.devnull, stderr = STDOUT)
00158             if ret != 0:
00159                 self.config['status'] = "FAIL"
00160                 self.config['errmsg'] = "Failed to " + netem_action + " netem qdisc on " + interface
00161                 if direction == "egress":
00162                     self.reset_egress()
00163                 else:
00164                     self.reset_ingress()
00165                 return 
00166 
00167         if direction == "egress":
00168             self.config['bandwidth_egress'] = bandwidth_new
00169             self.config['latency_egress'] = latency_new
00170             self.config['loss_egress'] = loss_new
00171         else:
00172             self.config['bandwidth_ingress'] = bandwidth_new
00173             self.config['latency_ingress'] = latency_new
00174             self.config['loss_ingress'] = loss_new
00175 
00176     def reset_ingress(self):
00177         ret = call(shlex.split("tc qdisc del dev " + self.interface + " ingress"), 
00178                    stdout = self.devnull, stderr = STDOUT)
00179         ret = call(shlex.split("tc qdisc del dev " + self.interface_ifb + " root"), 
00180                    stdout = self.devnull, stderr = STDOUT)
00181         self.config['bandwidth_ingress'] = 0.0
00182         self.config['loss_ingress'] = 0.0
00183         self.config['latency_ingress'] = 0.0
00184         self.ingress_control = False
00185 
00186     def init_ingress(self):
00187         # set up ifb device
00188         if self.ifb_not_set_up:
00189             call(shlex.split("modprobe ifb"),
00190                  stdout = self.devnull, stderr = STDOUT)
00191             call(shlex.split("ip link set dev " + self.interface_ifb + " up"),
00192                  stdout = self.devnull, stderr = STDOUT)
00193             self.ifb_not_set_up = False
00194         # add ingress qdisc
00195         ret = call(shlex.split("tc qdisc add dev " + self.interface + " handle ffff: ingress"),
00196                    stdout = self.devnull, stderr = STDOUT)
00197         if ret != 0:
00198             self.config['status'] = "FAIL"
00199             self.config['errmsg'] = self.config['errmsg'] + \
00200                 "Failed to add ingress qdisc at root on " + self.interface
00201             self.reset_ingress()
00202             return 
00203         # add redirect filter
00204         ret = call(shlex.split("tc filter add dev " + self.interface + " parent ffff: protocol ip " +
00205                                self.filter_ingress + 
00206                                " action mirred egress redirect dev " + self.interface_ifb),
00207                    stdout = self.devnull, stderr = STDOUT)
00208         if ret != 0:
00209             self.config['status'] = "FAIL"
00210             self.config['errmsg'] = self.config['errmsg'] + \
00211                 "Failed to add redirect filter on ingress for " + self.interface
00212             self.reset_ingress()
00213             return 
00214         # add tbf root rule on ifb-device
00215         ret = call(shlex.split("tc qdisc add dev " + self.interface_ifb + " root handle 2:1 " +
00216                                "tbf rate 10000mbit buffer 500000 limit 100000"),
00217                    stdout = self.devnull, stderr = STDOUT)
00218         if ret != 0:
00219             self.config['status'] = "FAIL"
00220             self.config['errmsg'] = self.config['errmsg'] + \
00221                 "Failed to add tbf qdisc at root on " + self.interface_ifb
00222             self.reset_ingress()
00223             return 
00224         # enable ingress control
00225         self.ingress_control = True
00226 
00227     def reconfigure(self, config, level):
00228         self.config['status'] = 'OK'
00229         self.config['errmsg'] = ''
00230 
00231         if 'packet_size' not in self.config:
00232             self.config['packet_size'] = config['packet_size']
00233 
00234         if 1 & level or 4 & level:
00235             if self.egress_control and config['bandwidth_egress'] == 0.0 and \
00236                     config['latency_egress'] == 0.0 and config['loss_egress'] == 0.0:
00237                 self.reset_egress()
00238 
00239             if not self.egress_control and (config['bandwidth_egress'] != 0.0 or \
00240                     config['latency_egress'] != 0.0 or config['loss_egress'] != 0.0):
00241                 self.init_egress()
00242                 
00243             if self.egress_control:
00244                 self.set_rules(config, "egress")
00245                 
00246         if 2 & level or 4 & level:
00247             if self.ingress_control and config['bandwidth_ingress'] == 0.0 and \
00248                     config['latency_ingress'] == 0.0 and config['loss_ingress'] == 0.0:
00249                 self.reset_ingress()
00250 
00251             if not self.ingress_control and (config['bandwidth_ingress'] != 0.0 or \
00252                     config['latency_ingress'] != 0.0 or config['loss_ingress'] != 0.0):
00253                 self.init_ingress()
00254                 
00255             if self.ingress_control:
00256                 self.set_rules(config, "ingress")
00257 
00258         self.config['packet_size'] = config['packet_size']
00259 
00260         return self.config
00261   
00262 
00263 if __name__ == "__main__":
00264     rospy.init_node("traffic_control_node")
00265 
00266     interface = rospy.get_param("~interface")
00267     args = dict()
00268     try:
00269         args["interface_ifb"] = rospy.get_param("~interface_ifb")        
00270     except KeyError:
00271         pass
00272     try:
00273         args["filter_egress"] = rospy.get_param("~filter_egress")        
00274     except KeyError:
00275         pass
00276     try:
00277         args["filter_ingress"] = rospy.get_param("~filter_ingress")        
00278     except KeyError:
00279         pass
00280 
00281     tc_mgr = TrafficControlManager(interface, **args)
00282 
00283     try:
00284         dynamic_reconfigure.server.Server(TrafficControlConfig, tc_mgr.reconfigure)
00285         rospy.spin()
00286     finally:
00287         tc_mgr.reset_egress()
00288         tc_mgr.reset_ingress()


network_traffic_control
Author(s): Catalin Drula
autogenerated on Thu Jan 2 2014 11:27:10