Package network_monitor_udp :: Module udpmoncli
[frames] | no frames]

Source Code for Module network_monitor_udp.udpmoncli

  1  #! /usr/bin/env python 
  2   
  3  from __future__ import with_statement 
  4   
  5  import time 
  6  import sys 
  7  import threading 
  8  import socket 
  9  import struct 
 10  import bisect 
 11  import traceback 
 12   
13 -class MonitorSource :
14 - def __init__(self, latencybins, destaddr, rate, pkt_length, ros_returnpath = False, rostopic_prefix = "", roundtrip = False, max_return_time = 0.0, source_id = None, tos = 0, paused = False, sourceaddr = None):
15 self.mutex = threading.Lock() 16 self.outstanding = {} 17 self.arrived = [] 18 self.latencybins = list(latencybins) 19 self.latencybins.sort() 20 self.latencybins.append(1e1000) 21 self.interval = 1.0 / rate 22 self.send_thread = threading.Thread(target = self.send_thread_entry, name = "udpmoncli: send_thread") 23 self.recv_thread = threading.Thread(target = self.recv_thread_entry, name = "udpmoncli: recv_thread") 24 self.dns_thread = threading.Thread(target = self.dns_thread_entry, name = "udpmoncli: dns_thread") 25 self.pkt_length = pkt_length 26 self.exiting = False 27 self.sourceaddr = self.resolve_addr(sourceaddr) 28 self.destaddr = destaddr 29 self.exceptions = set() 30 31 self.source_id = source_id 32 self.roundtrip = roundtrip 33 self.ros_returnpath = ros_returnpath 34 self.tos = tos 35 36 self.max_return_time = max_return_time 37 if roundtrip: 38 self.max_rtt = self.latencybins[-2] 39 else: 40 self.max_rtt = self.latencybins[-2] + max_return_time 41 42 self.sending_too_fast = False 43 44 try: 45 self.destaddr_ip = self.resolve_addr(self.destaddr) 46 except: 47 pass # We'll try again later if it fails. 48 self.lost = 0 49 50 self._reset_bins() 51 52 if self.ros_returnpath: 53 import rospy 54 from msg import UdpSink 55 self.sub = rospy.Subscriber(rostopic_prefix + "udpsink_feedback", UdpSink, self.receive_ros_feedback) 56 while self.sub.get_num_connections() == 0 : # block until subscribing succeeds 57 time.sleep(0.3) 58 continue 59 self.magic, = struct.unpack("=i", struct.pack("BBBB", 0xEF, 0x41, 0xC6, 0x34)) 60 else: 61 self.magic, = struct.unpack("=i", struct.pack("BBBB", 0xEF, 0x41, 0xC6, 0x35)) 62 63 if self.source_id is not None: 64 self.pktstruct = "=iddii" 65 else: 66 self.pktstruct = "=iddi" 67 self.hdr_len = struct.calcsize(self.pktstruct) 68 if (pkt_length < self.hdr_len): 69 print >> sys.stderr, "pkt_length must be at least", self.hdr_len 70 return 71 72 self.cv = threading.Condition() 73 self.paused = True 74 if not paused: 75 self.start_monitor() 76 77 self.window_start = time.time() 78 self.dns_thread.start() 79 self.recv_thread.start() 80 self.send_thread.start()
81
82 - def receive_ros_feedback(self, msg):
83 if msg.source_id != self.source_id: 84 return 85 recv_time = time.time() 86 self.process_rcvd_packet(msg.send_time, msg.echo_time, recv_time, msg.seqnum)
87
88 - def get_source_id(self):
89 return self.source_id
90
91 - def hit_send_rate_limit(self):
92 sending_too_fast = self.sending_too_fast 93 self.sending_too_fast = False 94 return sending_too_fast
95
96 - def set_tx_bandwidth(self, bw):
97 self.interval = (self.pkt_length * 8) / (bw + 0.00001)
98
99 - def resolve_addr(self, addr):
100 if addr == None: 101 return addr 102 host, port = addr 103 return (socket.gethostbyname(host), port)
104
105 - def init_socket(self, sourceaddr = None):
106 if sourceaddr != None and self.sourceaddr != sourceaddr: 107 self.sourceaddr = self.resolve_addr(sourceaddr) 108 109 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 110 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 111 self.socket.settimeout(0.2) 112 if self.sourceaddr: 113 self.socket.bind(self.sourceaddr) 114 115 if self.tos: 116 try: 117 self.socket.setsockopt(socket.SOL_IP, socket.IP_TOS, self.tos) 118 except socket.error, e: 119 print "Could not set TOS:", str(e)
120
121 - def start_monitor(self, sourceaddr = None):
122 print "Starting UDP monitor" 123 if self.paused: 124 self.init_socket(sourceaddr) 125 with self.cv: 126 self.paused = False 127 self.cv.notify_all()
128
129 - def stop_monitor(self):
130 if not self.paused: 131 self.paused = True 132 self.socket.close()
133
134 - def _reset_bins(self):
135 self.bins = [0 for i in range(0,len(self.latencybins))]
136
137 - def shutdown(self):
138 #print "Udp monitor starting to shut down" 139 self.exiting = True 140 with self.cv: 141 self.cv.notify_all()
142 143 # re-resolve the IP of the base station every 60 seconds.
144 - def dns_thread_entry(self):
145 next_time = time.time() 146 while not self.exiting: 147 try: 148 self.destaddr_ip = self.resolve_addr(self.destaddr) 149 sys.stderr.write("Base station address is %s:%d\n"%self.destaddr_ip) 150 except: 151 sys.stderr.write("Failed to resolve base station address\n") 152 continue 153 time.sleep(15)
154
155 - def send_thread_entry(self):
156 next_time = time.time() 157 seqnum = 0 158 while not self.exiting: 159 try: 160 sleeptime = next_time - time.time() 161 if (sleeptime < -1): 162 self.exceptions.add("Send thread too far behind. Resetting expectations.") 163 self.sending_too_fast = True 164 next_time = time.time() 165 elif sleeptime > 0: 166 time.sleep(sleeptime) 167 if self.paused: 168 with self.cv: 169 if not self.exiting: 170 self.cv.wait() 171 next_time = time.time() 172 continue 173 seqnum = seqnum + 1 174 next_time = next_time + self.interval 175 send_time = time.time() 176 if self.source_id is not None: 177 hdr = struct.pack(self.pktstruct, self.magic, send_time, 0, seqnum, self.source_id) 178 else: 179 hdr = struct.pack(self.pktstruct, self.magic, send_time, 0, seqnum) 180 with self.mutex: 181 self.outstanding[seqnum] = send_time 182 self.socket.sendto(hdr.ljust(self.pkt_length), self.destaddr_ip) 183 except Exception, e: 184 self.exceptions.add(str(e)) 185 print "Got exception in send thread:", e 186 #traceback.print_exc(10) 187 #print self.destaddr 188 #print "Udp monitor send_thread finished shut down" 189 self.socket.close()
190
191 - def recv_thread_entry(self):
192 if self.ros_returnpath: 193 return 194 195 while not self.exiting: 196 if self.paused: 197 with self.cv: 198 if not self.exiting: 199 self.cv.wait() 200 next_time = time.time() 201 continue 202 try: 203 indata = self.socket.recv(4096) 204 recv_time = time.time() 205 if self.source_id is None: 206 (magic, send_time, echo_time, seq_num) = struct.unpack(self.pktstruct, indata[0:self.hdr_len]) 207 else: 208 (magic, send_time, echo_time, seq_num, source_id) = struct.unpack(self.pktstruct, indata[0:self.hdr_len]) 209 if magic != self.magic or (self.source_id is not None and self.source_id != source_id) : 210 continue 211 self.process_rcvd_packet(send_time, echo_time, recv_time, seq_num) 212 except socket.timeout: 213 pass 214 except Exception, e: 215 self.exceptions.add(str(e))
216 #print "Udp monitor recv_thread finished shut down" 217
218 - def process_rcvd_packet(self, send_time, echo_time, recv_time, seq_num):
219 if self.roundtrip: 220 latency = recv_time - send_time 221 else: 222 latency = echo_time - send_time 223 # print warnings for those packets that overran maximum round trip time 224 # due specifically to a large return time 225 if recv_time - send_time > self.max_rtt and latency < self.latencybins[-2]: 226 self.exceptions.add("Packet return time %.2fms exceeds maximum return time of %.2fms"% 227 ((recv_time - echo_time) * 1e3, self.max_return_time * 1e3)) 228 self.bins[bisect.bisect(self.latencybins, latency)] += 1 229 with self.mutex: 230 self.arrived.append((send_time, latency, seq_num))
231
232 - def get_statistics(self):
233 bins = [0 for i in range(0,len(self.latencybins))] 234 with self.mutex: 235 now = time.time() 236 arrived = self.arrived 237 self.arrived = [] 238 outstanding = self.outstanding 239 self.outstanding = {} 240 exceptions = self.exceptions 241 self.exceptions = set() 242 window_end = now - self.max_rtt 243 window_start = self.window_start 244 sum_latency = 0. 245 count = 0 246 sum_latency_restricted = 0. 247 count_restricted = 0 248 249 missed = 0 250 for pkt in arrived: 251 (send_time, latency, seq_num) = pkt 252 if send_time < window_end: 253 count += 1 254 sum_latency += latency 255 bins[bisect.bisect(self.latencybins, latency)] += 1 256 if seq_num in outstanding: 257 if latency <= self.latencybins[-2]: 258 count_restricted += 1 259 sum_latency_restricted += latency 260 elif send_time >= window_start: 261 missed += 1 262 if seq_num in outstanding: 263 del outstanding[seq_num] 264 else: 265 self.arrived.append(pkt) 266 267 for (seq_num, send_time) in outstanding.iteritems(): 268 if send_time < window_end: 269 missed += 1 270 self.lost += 1 271 else: 272 self.outstanding[seq_num] = send_time 273 self.window_start = window_end 274 return Statistics(count, count_restricted, missed, sum_latency, sum_latency_restricted, bins, window_start, window_end, exceptions)
275
276 - def get_smart_bins(self, window):
277 stats = self.get_statistics() 278 for s in stats.exceptions: 279 print "Got exception", s 280 return stats.get_smart_bins()
281
282 -class Statistics:
283 - def __init__(self, count, count_restricted, missed, sum_latency, sum_latency_restricted, bins, window_start, window_end, exceptions):
284 self.exceptions = exceptions 285 self.count = count 286 self.count_restricted = count_restricted 287 self.missed = missed 288 self.sum_latency = sum_latency 289 self.sum_latency_restricted = sum_latency_restricted 290 self.bins = bins 291 self.window_start = window_start 292 self.window_end = window_end
293
294 - def get_window_length(self):
295 return self.window_end - self.window_start
296
297 - def get_percentage_bins(self):
298 denom = self.count_restricted + self.missed 299 if not denom: 300 denom = 1 # bins will be all zero anyhow in this case. 301 denom = float(denom) 302 return [ val / (denom) for val in self.bins ]
303
304 - def get_average_latency(self):
305 if self.count: 306 return self.sum_latency / float(self.count) 307 else: 308 return 0.0
309
311 if self.count_restricted: 312 return self.sum_latency_restricted / float(self.count_restricted) 313 else: 314 return 0.0
315
316 - def get_smart_bins(self):
318
319 - def accumulate(self, extra_stats):
320 """Combines the stats from extra_stats into the existing stats. 321 Expects the time windows to be adjacent.""" 322 if len(extra_stats.bins) != len(self.bins): 323 raise ValueError("Tried to merge Statistics with different bin sizes.") 324 325 if extra_stats.window_end == self.window_start: 326 self.window_start = extra_stats.window_start 327 elif extra_stats.window_start == self.window_end: 328 self.window_end = extra_stats.window_end 329 else: 330 raise ValueError("Tried to accumulate non-adjacent Statistics.") 331 332 for i in range(0, len(self.bins)): 333 self.bins[i] += extra_stats.bins[i] 334 335 self.count += extra_stats.count 336 self.count_restricted += extra_stats.count_restricted 337 self.missed += extra_stats.missed 338 self.sum_latency += extra_stats.sum_latency 339 self.sum_latency_restricted += extra_stats.sum_latency_restricted 340 self.exceptions |= extra_stats.exceptions
341
342 -class MonitorClient(MonitorSource):
343 - def __init__(self, latencybins, destaddr, rate, pkt_length, paused = False, sourceaddr = None):
344 MonitorSource.__init__(self, latencybins, destaddr, rate, pkt_length, paused=paused, sourceaddr=sourceaddr, roundtrip = True)
345 346 347 if __name__ == "__main__": 348 try: 349 if not len(sys.argv) in [5, 6]: 350 print "usage: udpmoncli.py <host> <port> <pkt_rate> <pkt_size> [<src_addr>]" 351 sys.exit(1) 352 host = sys.argv[1] 353 port = int(sys.argv[2]) 354 rate = float(sys.argv[3]) 355 size = int(sys.argv[4]) 356 if len(sys.argv) == 6: 357 src_addr = sys.argv[5] 358 else: 359 src_addr = '0.0.0.0' 360 #cli = MonitorClient([.005, .01, .1, .2, .3, .4, .5], (host, int(port)), rate, size) 361 cli = MonitorClient([.005, .01, .025, .05, .075, .1], (host, int(port)), rate, size, sourceaddr = (src_addr, 0)) 362 try: 363 display_interval = 0.5 364 start_time = time.time() 365 next_time = start_time 366 while True: 367 next_time = next_time + display_interval 368 sleeptime = next_time - time.time() 369 if sleeptime > 0: 370 time.sleep(sleeptime) 371 if 0: 372 bins = cli.get_bins() 373 else: 374 bins, average, average_restricted = cli.get_smart_bins(display_interval) 375 print "%7.3f:"%(time.time() - start_time), 376 for i in range(0,len(bins)): 377 print "%3i"%(int(100*bins[i])), 378 if i == 2: 379 print " /", 380 print "avg: %5.1f ms"%(1000*average), "avgr: %5.1f ms"%(1000*average_restricted), "loss: %6.2f %%"%(100 - 100 * sum(bins[0:-1])) 381 sys.stdout.flush() 382 finally: 383 cli.shutdown() 384 print >> sys.stderr, "Round trip latency summary (packets):" 385 for i in range(0, len(cli.latencybins)): 386 print >> sys.stderr, "%.1f ms: %i before %i after"%(cli.latencybins[i] * 1000, sum(cli.bins[0:i+1]), sum(cli.bins[i+1:]) + cli.lost) 387 388 except KeyboardInterrupt: 389 print >> sys.stderr, "Exiting on CTRL+C." 390