00001
00002
00003 from __future__ import with_statement
00004
00005 import time
00006 import sys
00007 import threading
00008 import socket
00009 import struct
00010 import bisect
00011 import traceback
00012
00013 class MonitorClient:
00014 def __init__(self, latencybins, destaddr, rate, pkt_length, paused = False, sourceaddr = None):
00015 self.mutex = threading.Lock()
00016 self.outstanding = {}
00017 self.arrived = []
00018 self.latencybins = list(latencybins)
00019 self.latencybins.sort()
00020 self.latencybins.append(1e1000)
00021 self.interval = 1.0 / rate
00022 self.send_thread = threading.Thread(target = self.send_thread_entry, name = "udpmoncli: send_thread")
00023 self.recv_thread = threading.Thread(target = self.recv_thread_entry, name = "udpmoncli: recv_thread")
00024 self.pkt_length = pkt_length
00025 self.exiting = False
00026 self.sourceaddr = self.resolve_addr(sourceaddr)
00027 self.destaddr = destaddr
00028 self.exceptions = set()
00029 try:
00030 self.destaddr = self.resolve_addr(self.destaddr)
00031 except:
00032 pass
00033 self.lost = 0
00034
00035 self._reset_bins()
00036
00037 self.magic, = struct.unpack("=i", struct.pack("BBBB", 0xEF, 0x41, 0xC6, 0x35))
00038 self.pktstruct = "=iddi"
00039 self.hdr_len = struct.calcsize(self.pktstruct)
00040 if (pkt_length < self.hdr_len):
00041 print >> sys.stderr, "pkt_length must be at least", self.hdr_len
00042 return
00043
00044 self.cv = threading.Condition()
00045 self.paused = True
00046 if not paused:
00047 self.start_monitor()
00048
00049 self.window_start = time.time()
00050 self.recv_thread.start()
00051 self.send_thread.start()
00052
00053 def resolve_addr(self, addr):
00054 if addr == None:
00055 return addr
00056 host, port = addr
00057 return (socket.gethostbyname(host), port)
00058
00059 def init_socket(self, sourceaddr = None):
00060 if sourceaddr != None and self.sourceaddr != sourceaddr:
00061 self.sourceaddr = self.resolve_addr(sourceaddr)
00062
00063 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00064 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00065 self.socket.settimeout(0.2)
00066 if self.sourceaddr:
00067 self.socket.bind(self.sourceaddr)
00068
00069 def start_monitor(self, sourceaddr = None):
00070 if self.paused:
00071 self.init_socket(sourceaddr)
00072 with self.cv:
00073 self.paused = False
00074 self.cv.notify_all()
00075
00076 def stop_monitor(self):
00077 if not self.paused:
00078 self.paused = True
00079 self.socket.close()
00080
00081 def _reset_bins(self):
00082 self.bins = [0 for i in range(0,len(self.latencybins))]
00083
00084 def shutdown(self):
00085
00086 self.exiting = True
00087 with self.cv:
00088 self.cv.notify_all()
00089
00090 def send_thread_entry(self):
00091 next_time = time.time()
00092 seqnum = 0
00093 while not self.exiting:
00094 try:
00095 sleeptime = next_time - time.time()
00096 if (sleeptime < -1):
00097 self.exceptions.add("Send thread too far behind. Resetting expectations.")
00098 next_time = time.time()
00099 elif sleeptime > 0:
00100 time.sleep(sleeptime)
00101 if self.paused:
00102 with self.cv:
00103 if not self.exiting:
00104 self.cv.wait()
00105 next_time = time.time()
00106 continue
00107 seqnum = seqnum + 1
00108 next_time = next_time + self.interval
00109 send_time = time.time()
00110 hdr = struct.pack(self.pktstruct, self.magic, send_time, 0, seqnum)
00111 with self.mutex:
00112 self.outstanding[seqnum] = send_time
00113 try:
00114 self.destaddr = self.resolve_addr(self.destaddr)
00115 except:
00116 continue
00117 self.socket.sendto(hdr.ljust(self.pkt_length), self.destaddr)
00118 except Exception, e:
00119 self.exceptions.add(str(e))
00120
00121
00122
00123
00124
00125 def recv_thread_entry(self):
00126 while not self.exiting:
00127 if self.paused:
00128 with self.cv:
00129 if not self.exiting:
00130 self.cv.wait()
00131 next_time = time.time()
00132 continue
00133 try:
00134 indata = self.socket.recv(4096)
00135 recv_time = time.time()
00136 (magic, send_time, echo_time, seq_num) = struct.unpack(self.pktstruct, indata[0:self.hdr_len])
00137 if magic != self.magic:
00138 continue
00139 latency = recv_time - send_time
00140 self.bins[bisect.bisect(self.latencybins, latency)] += 1
00141 with self.mutex:
00142 try:
00143 del self.outstanding[seq_num]
00144 except KeyError:
00145 pass
00146 self.arrived.append((send_time, latency))
00147 except socket.timeout:
00148 pass
00149
00150
00151 def get_statistics(self, window):
00152 bins = [0 for i in range(0,len(self.latencybins))]
00153 with self.mutex:
00154 now = time.time()
00155 arrived = self.arrived
00156 self.arrived = []
00157 outstanding = self.outstanding
00158 self.outstanding = {}
00159 exceptions = self.exceptions
00160 self.exceptions = set()
00161 for s in exceptions:
00162 print "Got exception", s
00163 window_end = now - self.latencybins[-2]
00164 window_start = self.window_start
00165 sum_latency = 0.
00166 count = 0
00167 sum_latency_restricted = 0.
00168 count_restricted = 0
00169 for pkt in arrived:
00170 (send_time, latency) = pkt
00171 if send_time < window_end:
00172 bins[bisect.bisect(self.latencybins, latency)] += 1
00173 count += 1
00174 sum_latency += latency
00175 if send_time >= window_start:
00176 count_restricted += 1
00177 sum_latency_restricted += latency
00178 else:
00179 self.arrived.append(pkt)
00180 missed = 0
00181 for (seq_num, send_time) in outstanding.iteritems():
00182 if send_time < window_end:
00183 missed += 1
00184 self.lost += 1
00185 else:
00186 self.outstanding[seq_num] = send_time
00187 self.window_start = window_end
00188
00189 return Statistics(count, count_restricted, missed, sum_latency, sum_latency_restricted, bins, window_start, window_end)
00190
00191 def get_smart_bins(self, window):
00192 return self.get_statistics(window).get_smart_bins()
00193
00194 class Statistics:
00195 def __init__(self, count, count_restricted, missed, sum_latency, sum_latency_restricted, bins, window_start, window_end):
00196 self.count = count
00197 self.count_restricted = count_restricted
00198 self.missed = missed
00199 self.sum_latency = sum_latency
00200 self.sum_latency_restricted = sum_latency_restricted
00201 self.bins = bins
00202 self.window_start = window_start
00203 self.window_end = window_end
00204
00205 def get_percentage_bins(self):
00206 denom = self.count_restricted + self.missed
00207 if not denom:
00208 denom = 1
00209 denom = float(denom)
00210 return [ val / (denom) for val in self.bins ]
00211
00212 def get_average_latency(self):
00213 if self.count:
00214 return self.sum_latency / float(self.count)
00215 else:
00216 return 0.0
00217
00218 def get_average_latency_restricted(self):
00219 if self.count_restricted:
00220 return self.sum_latency_restricted / float(self.count_restricted)
00221 else:
00222 return 0.0
00223
00224 def get_smart_bins(self):
00225 return self.get_percentage_bins(), self.get_average_latency(), self.get_average_latency_restricted()
00226
00227 def accumulate(self, extra_stats):
00228 """Combines the stats from extra_stats into the existing stats.
00229 Expects the time windows to be adjacent."""
00230 if len(extra_stats.bins) != len(self.bins):
00231 raise ValueError("Tried to merge Statistics with different bin sizes.")
00232
00233 if extra_stats.window_end == self.window_start:
00234 self.window_start = extra_stats.window_start
00235 elif extra_stats.window_start == self.window_end:
00236 self.window_end = extra_stats.window_end
00237 else:
00238 raise ValueError("Tried to accumulate non-adjacent Statistics.")
00239
00240 for i in len(self.bins):
00241 self.bins[i] += extra_stats.bins[i]
00242
00243 self.count += extra_stats.count
00244 self.count_restricted += extra_stats.count_restricted
00245 self.missed += extra_stats.missed
00246 self.sum_latency += extra_stats.sum_latency
00247 self.sum_latency_restricted += extra_stats.sum_latency_restricted
00248 self.count += extra_stats.count
00249
00250 if __name__ == "__main__":
00251 try:
00252 if not len(sys.argv) in [5, 6]:
00253 print "usage: udpmoncli.py <host> <port> <pkt_rate> <pkt_size> [<src_addr>]"
00254 sys.exit(1)
00255 host = sys.argv[1]
00256 port = int(sys.argv[2])
00257 rate = float(sys.argv[3])
00258 size = int(sys.argv[4])
00259 if len(sys.argv) == 6:
00260 src_addr = sys.argv[5]
00261 else:
00262 src_addr = '0.0.0.0'
00263
00264 cli = MonitorClient([.005, .01, .025, .05, .075, .1], (host, int(port)), rate, size, sourceaddr = (src_addr, 0))
00265 try:
00266 display_interval = 0.5
00267 start_time = time.time()
00268 next_time = start_time
00269 while True:
00270 next_time = next_time + display_interval
00271 sleeptime = next_time - time.time()
00272 if sleeptime > 0:
00273 time.sleep(sleeptime)
00274 if 0:
00275 bins = cli.get_bins()
00276 else:
00277 bins, average, average_restricted = cli.get_smart_bins(display_interval)
00278 print "%7.3f:"%(time.time() - start_time),
00279 for i in range(0,len(bins)):
00280 print "%3i"%(int(100*bins[i])),
00281 if i == 2:
00282 print " /",
00283 print "avg: %5.1f ms"%(1000*average), "avgr: %5.1f ms"%(1000*average_restricted), "loss: %6.2f %%"%(100 - 100 * sum(bins[0:-1]))
00284 sys.stdout.flush()
00285 finally:
00286 cli.shutdown()
00287 print >> sys.stderr, "Round trip latency summary (packets):"
00288 for i in range(0, len(cli.latencybins)):
00289 print >> sys.stderr, "%.1f ms: %i before %i after"%(cli.latencybins[i] * 1000, sum(cli.bins[0:i+1]), sum(cli.bins[i+1:]) + cli.lost)
00290
00291 except KeyboardInterrupt:
00292 print >> sys.stderr, "Exiting on CTRL+C."
00293