00001
00002
00003 from subprocess import call, STDOUT
00004 import shlex
00005 import math
00006
00007 import roslib; roslib.load_manifest('network_traffic_control')
00008 import rospy
00009
00010 import dynamic_reconfigure.server
00011 from network_traffic_control.cfg import TrafficControlConfig
00012
00013 DEFAULT_PACKET_SIZE = 1500
00014 PACKET_QUEUE_SIZE = 1
00015 MAX_PACKET_BURST = 5
00016
00017 class TrafficControlManager:
00018 def __init__(self, interface,
00019 interface_ifb = "ifb0",
00020 filter_ingress = "u32 match u32 0 0",
00021 filter_egress = "u32 match u32 0 0"):
00022 self.interface = interface
00023 self.interface_ifb = interface_ifb
00024 self.filter_egress = filter_egress
00025 self.filter_ingress = filter_ingress
00026 self.ifb_not_set_up = True
00027 self.devnull = open('/dev/null', 'w')
00028 self.config = dict()
00029 self.reset_egress()
00030 self.reset_ingress()
00031
00032 def reset_egress(self):
00033 call(shlex.split("tc qdisc del dev " + self.interface + " root"),
00034 stdout = self.devnull, stderr = STDOUT)
00035 self.config['bandwidth_egress'] = 0.0
00036 self.config['latency_egress'] = 0.0
00037 self.config['loss_egress'] = 0.0
00038 self.egress_control = False
00039
00040 def init_egress(self):
00041
00042 ret = call(shlex.split("tc qdisc add dev " + self.interface + " handle 1: root htb"),
00043 stdout = self.devnull, stderr = STDOUT)
00044 if ret != 0:
00045 self.config['status'] = "FAIL"
00046 self.config['errmsg'] = "Failed to add htb qdisc at root on " + self.interface
00047 self.reset_egress()
00048 return
00049
00050 ret = call(shlex.split("tc class add dev " + self.interface + " parent 1: classid 1:1"
00051 " htb rate 10000Mbps"),
00052 stdout = self.devnull, stderr = STDOUT)
00053 if ret != 0:
00054 self.config['status'] = "FAIL"
00055 self.config['errmsg'] = "Failed to add htb class on " + self.interface
00056 self.reset_egress()
00057 return
00058
00059 ret = call(shlex.split("tc filter add dev " + self.interface + " protocol ip prio 1 " +
00060 self.filter_egress + " flowid 1:1"),
00061 stdout = self.devnull, stderr = STDOUT)
00062 if ret != 0:
00063 self.config['status'] = "FAIL"
00064 self.config['errmsg'] = "Failed to set egress filter on " + self.interface
00065 self.reset_egress()
00066 return
00067
00068 ret = call(shlex.split("tc qdisc add dev " + self.interface + " parent 1:1 handle 2:1 "
00069 "tbf rate 10000mbit buffer 500000 limit 100000"),
00070 stdout = self.devnull, stderr = STDOUT)
00071 if ret != 0:
00072 self.config['status'] = "FAIL"
00073 self.config['errmsg'] = "Failed to add tbf qdisc on " + self.interface
00074 self.reset_egress()
00075 return
00076
00077 self.egress_control = True
00078
00079 def set_rules(self, config, direction):
00080 if direction == "egress":
00081 latency_new = config['latency_egress']
00082 loss_new = config['loss_egress']
00083 bandwidth_new = config['bandwidth_egress']
00084
00085 latency_old = self.config['latency_egress']
00086 loss_old = self.config['loss_egress']
00087 bandwidth_old = self.config['bandwidth_egress']
00088
00089 interface = self.interface
00090 elif direction == "ingress":
00091 latency_new = config['latency_ingress']
00092 loss_new = config['loss_ingress']
00093 bandwidth_new = config['bandwidth_ingress']
00094
00095 latency_old = self.config['latency_ingress']
00096 loss_old = self.config['loss_ingress']
00097 bandwidth_old = self.config['bandwidth_ingress']
00098
00099 interface = self.interface_ifb
00100 else:
00101 self.config['status'] = "FAIL"
00102 self.config['errmsg'] = "Unknown direction: " + direction
00103
00104 latency_loss_disabled = (latency_new == 0.0 and loss_new == 0.0) and \
00105 (latency_old != 0.0 or loss_old != 0.0)
00106 latency_loss_enabled = (latency_new != 0.0 or loss_new != 0.0) and \
00107 (latency_old == 0.0 and loss_old == 0.0)
00108
00109 if bandwidth_new != bandwidth_old or \
00110 config['packet_size'] != self.config['packet_size'] or latency_loss_disabled:
00111 if bandwidth_new > 0.0:
00112 bw_kbits = bandwidth_new/1e3
00113 bw_limit = (config['packet_size'] + 100) * PACKET_QUEUE_SIZE
00114 bw_buffer = (config['packet_size'] + 100) * MAX_PACKET_BURST
00115 else:
00116 bw_kbits = 10000 * 1e3
00117 bw_limit = 100000
00118 bw_buffer = 500000
00119
00120 ret = call(shlex.split("tc qdisc change dev " + interface + " handle 2:1 tbf "
00121 "rate %.2fkbit buffer %d limit %d"%(bw_kbits, bw_buffer, bw_limit)),
00122 stdout = self.devnull, stderr = STDOUT)
00123
00124 if ret != 0:
00125 self.config['status'] = "FAIL"
00126 self.config['errmsg'] = self.config['errmsg'] + \
00127 "Failed to change tbf (bandwidth) rule on " + interface
00128 if direction == "egress":
00129 self.reset_egress()
00130 else:
00131 self.reset_ingress()
00132 return
00133
00134 netem_action = "add"
00135 else:
00136 if not latency_loss_enabled:
00137 netem_action = "change"
00138 else:
00139 netem_action = "add"
00140
00141
00142
00143 if (latency_new != 0.0 or loss_new != 0.0) and \
00144 (netem_action == "add" or latency_new != latency_old or loss_new != loss_old):
00145 latency_ms = 1e3 * latency_new
00146 loss = loss_new
00147 if bandwidth_new == 0.0:
00148 limit = 1000
00149 else:
00150 limit = math.ceil(1.0 +
00151 latency_new / (8.0 * float(config['packet_size'])/bandwidth_new))
00152 limit = int(limit)
00153
00154 ret = call(shlex.split("tc qdisc " + netem_action + " dev " + interface +
00155 " parent 2:1 handle 3:1 "
00156 "netem latency %.2fms loss %.2f%% limit %d"%(latency_ms, loss, limit)),
00157 stdout = self.devnull, stderr = STDOUT)
00158 if ret != 0:
00159 self.config['status'] = "FAIL"
00160 self.config['errmsg'] = "Failed to " + netem_action + " netem qdisc on " + interface
00161 if direction == "egress":
00162 self.reset_egress()
00163 else:
00164 self.reset_ingress()
00165 return
00166
00167 if direction == "egress":
00168 self.config['bandwidth_egress'] = bandwidth_new
00169 self.config['latency_egress'] = latency_new
00170 self.config['loss_egress'] = loss_new
00171 else:
00172 self.config['bandwidth_ingress'] = bandwidth_new
00173 self.config['latency_ingress'] = latency_new
00174 self.config['loss_ingress'] = loss_new
00175
00176 def reset_ingress(self):
00177 ret = call(shlex.split("tc qdisc del dev " + self.interface + " ingress"),
00178 stdout = self.devnull, stderr = STDOUT)
00179 ret = call(shlex.split("tc qdisc del dev " + self.interface_ifb + " root"),
00180 stdout = self.devnull, stderr = STDOUT)
00181 self.config['bandwidth_ingress'] = 0.0
00182 self.config['loss_ingress'] = 0.0
00183 self.config['latency_ingress'] = 0.0
00184 self.ingress_control = False
00185
00186 def init_ingress(self):
00187
00188 if self.ifb_not_set_up:
00189 call(shlex.split("modprobe ifb"),
00190 stdout = self.devnull, stderr = STDOUT)
00191 call(shlex.split("ip link set dev " + self.interface_ifb + " up"),
00192 stdout = self.devnull, stderr = STDOUT)
00193 self.ifb_not_set_up = False
00194
00195 ret = call(shlex.split("tc qdisc add dev " + self.interface + " handle ffff: ingress"),
00196 stdout = self.devnull, stderr = STDOUT)
00197 if ret != 0:
00198 self.config['status'] = "FAIL"
00199 self.config['errmsg'] = self.config['errmsg'] + \
00200 "Failed to add ingress qdisc at root on " + self.interface
00201 self.reset_ingress()
00202 return
00203
00204 ret = call(shlex.split("tc filter add dev " + self.interface + " parent ffff: protocol ip " +
00205 self.filter_ingress +
00206 " action mirred egress redirect dev " + self.interface_ifb),
00207 stdout = self.devnull, stderr = STDOUT)
00208 if ret != 0:
00209 self.config['status'] = "FAIL"
00210 self.config['errmsg'] = self.config['errmsg'] + \
00211 "Failed to add redirect filter on ingress for " + self.interface
00212 self.reset_ingress()
00213 return
00214
00215 ret = call(shlex.split("tc qdisc add dev " + self.interface_ifb + " root handle 2:1 " +
00216 "tbf rate 10000mbit buffer 500000 limit 100000"),
00217 stdout = self.devnull, stderr = STDOUT)
00218 if ret != 0:
00219 self.config['status'] = "FAIL"
00220 self.config['errmsg'] = self.config['errmsg'] + \
00221 "Failed to add tbf qdisc at root on " + self.interface_ifb
00222 self.reset_ingress()
00223 return
00224
00225 self.ingress_control = True
00226
00227 def reconfigure(self, config, level):
00228 self.config['status'] = 'OK'
00229 self.config['errmsg'] = ''
00230
00231 if 'packet_size' not in self.config:
00232 self.config['packet_size'] = config['packet_size']
00233
00234 if 1 & level or 4 & level:
00235 if self.egress_control and config['bandwidth_egress'] == 0.0 and \
00236 config['latency_egress'] == 0.0 and config['loss_egress'] == 0.0:
00237 self.reset_egress()
00238
00239 if not self.egress_control and (config['bandwidth_egress'] != 0.0 or \
00240 config['latency_egress'] != 0.0 or config['loss_egress'] != 0.0):
00241 self.init_egress()
00242
00243 if self.egress_control:
00244 self.set_rules(config, "egress")
00245
00246 if 2 & level or 4 & level:
00247 if self.ingress_control and config['bandwidth_ingress'] == 0.0 and \
00248 config['latency_ingress'] == 0.0 and config['loss_ingress'] == 0.0:
00249 self.reset_ingress()
00250
00251 if not self.ingress_control and (config['bandwidth_ingress'] != 0.0 or \
00252 config['latency_ingress'] != 0.0 or config['loss_ingress'] != 0.0):
00253 self.init_ingress()
00254
00255 if self.ingress_control:
00256 self.set_rules(config, "ingress")
00257
00258 self.config['packet_size'] = config['packet_size']
00259
00260 return self.config
00261
00262
00263 if __name__ == "__main__":
00264 rospy.init_node("traffic_control_node")
00265
00266 interface = rospy.get_param("~interface")
00267 args = dict()
00268 try:
00269 args["interface_ifb"] = rospy.get_param("~interface_ifb")
00270 except KeyError:
00271 pass
00272 try:
00273 args["filter_egress"] = rospy.get_param("~filter_egress")
00274 except KeyError:
00275 pass
00276 try:
00277 args["filter_ingress"] = rospy.get_param("~filter_ingress")
00278 except KeyError:
00279 pass
00280
00281 tc_mgr = TrafficControlManager(interface, **args)
00282
00283 try:
00284 dynamic_reconfigure.server.Server(TrafficControlConfig, tc_mgr.reconfigure)
00285 rospy.spin()
00286 finally:
00287 tc_mgr.reset_egress()
00288 tc_mgr.reset_ingress()