1
2
3 from __future__ import with_statement
4
5 import time
6 import sys
7 import threading
8 import socket
9 import struct
10 import bisect
11 import traceback
12
14 - def __init__(self, latencybins, destaddr, rate, pkt_length, ros_returnpath = False, rostopic_prefix = "", roundtrip = False, max_return_time = 0.0, source_id = None, tos = 0, paused = False, sourceaddr = None):
15 self.mutex = threading.Lock()
16 self.outstanding = {}
17 self.arrived = []
18 self.latencybins = list(latencybins)
19 self.latencybins.sort()
20 self.latencybins.append(1e1000)
21 self.interval = 1.0 / rate
22 self.send_thread = threading.Thread(target = self.send_thread_entry, name = "udpmoncli: send_thread")
23 self.recv_thread = threading.Thread(target = self.recv_thread_entry, name = "udpmoncli: recv_thread")
24 self.dns_thread = threading.Thread(target = self.dns_thread_entry, name = "udpmoncli: dns_thread")
25 self.pkt_length = pkt_length
26 self.exiting = False
27 self.sourceaddr = self.resolve_addr(sourceaddr)
28 self.destaddr = destaddr
29 self.exceptions = set()
30
31 self.source_id = source_id
32 self.roundtrip = roundtrip
33 self.ros_returnpath = ros_returnpath
34 self.tos = tos
35
36 self.max_return_time = max_return_time
37 if roundtrip:
38 self.max_rtt = self.latencybins[-2]
39 else:
40 self.max_rtt = self.latencybins[-2] + max_return_time
41
42 self.sending_too_fast = False
43
44 try:
45 self.destaddr_ip = self.resolve_addr(self.destaddr)
46 except:
47 pass
48 self.lost = 0
49
50 self._reset_bins()
51
52 if self.ros_returnpath:
53 import rospy
54 from msg import UdpSink
55 self.sub = rospy.Subscriber(rostopic_prefix + "udpsink_feedback", UdpSink, self.receive_ros_feedback)
56 while self.sub.get_num_connections() == 0 :
57 time.sleep(0.3)
58 continue
59 self.magic, = struct.unpack("=i", struct.pack("BBBB", 0xEF, 0x41, 0xC6, 0x34))
60 else:
61 self.magic, = struct.unpack("=i", struct.pack("BBBB", 0xEF, 0x41, 0xC6, 0x35))
62
63 if self.source_id is not None:
64 self.pktstruct = "=iddii"
65 else:
66 self.pktstruct = "=iddi"
67 self.hdr_len = struct.calcsize(self.pktstruct)
68 if (pkt_length < self.hdr_len):
69 print >> sys.stderr, "pkt_length must be at least", self.hdr_len
70 return
71
72 self.cv = threading.Condition()
73 self.paused = True
74 if not paused:
75 self.start_monitor()
76
77 self.window_start = time.time()
78 self.dns_thread.start()
79 self.recv_thread.start()
80 self.send_thread.start()
81
87
90
92 sending_too_fast = self.sending_too_fast
93 self.sending_too_fast = False
94 return sending_too_fast
95
97 self.interval = (self.pkt_length * 8) / (bw + 0.00001)
98
100 if addr == None:
101 return addr
102 host, port = addr
103 return (socket.gethostbyname(host), port)
104
106 if sourceaddr != None and self.sourceaddr != sourceaddr:
107 self.sourceaddr = self.resolve_addr(sourceaddr)
108
109 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
110 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
111 self.socket.settimeout(0.2)
112 if self.sourceaddr:
113 self.socket.bind(self.sourceaddr)
114
115 if self.tos:
116 try:
117 self.socket.setsockopt(socket.SOL_IP, socket.IP_TOS, self.tos)
118 except socket.error, e:
119 print "Could not set TOS:", str(e)
120
122 print "Starting UDP monitor"
123 if self.paused:
124 self.init_socket(sourceaddr)
125 with self.cv:
126 self.paused = False
127 self.cv.notify_all()
128
130 if not self.paused:
131 self.paused = True
132 self.socket.close()
133
135 self.bins = [0 for i in range(0,len(self.latencybins))]
136
138
139 self.exiting = True
140 with self.cv:
141 self.cv.notify_all()
142
143
145 next_time = time.time()
146 while not self.exiting:
147 try:
148 self.destaddr_ip = self.resolve_addr(self.destaddr)
149 sys.stderr.write("Base station address is %s:%d\n"%self.destaddr_ip)
150 except:
151 sys.stderr.write("Failed to resolve base station address\n")
152 continue
153 time.sleep(15)
154
156 next_time = time.time()
157 seqnum = 0
158 while not self.exiting:
159 try:
160 sleeptime = next_time - time.time()
161 if (sleeptime < -1):
162 self.exceptions.add("Send thread too far behind. Resetting expectations.")
163 self.sending_too_fast = True
164 next_time = time.time()
165 elif sleeptime > 0:
166 time.sleep(sleeptime)
167 if self.paused:
168 with self.cv:
169 if not self.exiting:
170 self.cv.wait()
171 next_time = time.time()
172 continue
173 seqnum = seqnum + 1
174 next_time = next_time + self.interval
175 send_time = time.time()
176 if self.source_id is not None:
177 hdr = struct.pack(self.pktstruct, self.magic, send_time, 0, seqnum, self.source_id)
178 else:
179 hdr = struct.pack(self.pktstruct, self.magic, send_time, 0, seqnum)
180 with self.mutex:
181 self.outstanding[seqnum] = send_time
182 self.socket.sendto(hdr.ljust(self.pkt_length), self.destaddr_ip)
183 except Exception, e:
184 self.exceptions.add(str(e))
185 print "Got exception in send thread:", e
186
187
188
189 self.socket.close()
190
192 if self.ros_returnpath:
193 return
194
195 while not self.exiting:
196 if self.paused:
197 with self.cv:
198 if not self.exiting:
199 self.cv.wait()
200 next_time = time.time()
201 continue
202 try:
203 indata = self.socket.recv(4096)
204 recv_time = time.time()
205 if self.source_id is None:
206 (magic, send_time, echo_time, seq_num) = struct.unpack(self.pktstruct, indata[0:self.hdr_len])
207 else:
208 (magic, send_time, echo_time, seq_num, source_id) = struct.unpack(self.pktstruct, indata[0:self.hdr_len])
209 if magic != self.magic or (self.source_id is not None and self.source_id != source_id) :
210 continue
211 self.process_rcvd_packet(send_time, echo_time, recv_time, seq_num)
212 except socket.timeout:
213 pass
214 except Exception, e:
215 self.exceptions.add(str(e))
216
217
231
233 bins = [0 for i in range(0,len(self.latencybins))]
234 with self.mutex:
235 now = time.time()
236 arrived = self.arrived
237 self.arrived = []
238 outstanding = self.outstanding
239 self.outstanding = {}
240 exceptions = self.exceptions
241 self.exceptions = set()
242 window_end = now - self.max_rtt
243 window_start = self.window_start
244 sum_latency = 0.
245 count = 0
246 sum_latency_restricted = 0.
247 count_restricted = 0
248
249 missed = 0
250 for pkt in arrived:
251 (send_time, latency, seq_num) = pkt
252 if send_time < window_end:
253 count += 1
254 sum_latency += latency
255 bins[bisect.bisect(self.latencybins, latency)] += 1
256 if seq_num in outstanding:
257 if latency <= self.latencybins[-2]:
258 count_restricted += 1
259 sum_latency_restricted += latency
260 elif send_time >= window_start:
261 missed += 1
262 if seq_num in outstanding:
263 del outstanding[seq_num]
264 else:
265 self.arrived.append(pkt)
266
267 for (seq_num, send_time) in outstanding.iteritems():
268 if send_time < window_end:
269 missed += 1
270 self.lost += 1
271 else:
272 self.outstanding[seq_num] = send_time
273 self.window_start = window_end
274 return Statistics(count, count_restricted, missed, sum_latency, sum_latency_restricted, bins, window_start, window_end, exceptions)
275
281
283 - def __init__(self, count, count_restricted, missed, sum_latency, sum_latency_restricted, bins, window_start, window_end, exceptions):
284 self.exceptions = exceptions
285 self.count = count
286 self.count_restricted = count_restricted
287 self.missed = missed
288 self.sum_latency = sum_latency
289 self.sum_latency_restricted = sum_latency_restricted
290 self.bins = bins
291 self.window_start = window_start
292 self.window_end = window_end
293
295 return self.window_end - self.window_start
296
298 denom = self.count_restricted + self.missed
299 if not denom:
300 denom = 1
301 denom = float(denom)
302 return [ val / (denom) for val in self.bins ]
303
305 if self.count:
306 return self.sum_latency / float(self.count)
307 else:
308 return 0.0
309
311 if self.count_restricted:
312 return self.sum_latency_restricted / float(self.count_restricted)
313 else:
314 return 0.0
315
318
320 """Combines the stats from extra_stats into the existing stats.
321 Expects the time windows to be adjacent."""
322 if len(extra_stats.bins) != len(self.bins):
323 raise ValueError("Tried to merge Statistics with different bin sizes.")
324
325 if extra_stats.window_end == self.window_start:
326 self.window_start = extra_stats.window_start
327 elif extra_stats.window_start == self.window_end:
328 self.window_end = extra_stats.window_end
329 else:
330 raise ValueError("Tried to accumulate non-adjacent Statistics.")
331
332 for i in range(0, len(self.bins)):
333 self.bins[i] += extra_stats.bins[i]
334
335 self.count += extra_stats.count
336 self.count_restricted += extra_stats.count_restricted
337 self.missed += extra_stats.missed
338 self.sum_latency += extra_stats.sum_latency
339 self.sum_latency_restricted += extra_stats.sum_latency_restricted
340 self.exceptions |= extra_stats.exceptions
341
343 - def __init__(self, latencybins, destaddr, rate, pkt_length, paused = False, sourceaddr = None):
345
346
347 if __name__ == "__main__":
348 try:
349 if not len(sys.argv) in [5, 6]:
350 print "usage: udpmoncli.py <host> <port> <pkt_rate> <pkt_size> [<src_addr>]"
351 sys.exit(1)
352 host = sys.argv[1]
353 port = int(sys.argv[2])
354 rate = float(sys.argv[3])
355 size = int(sys.argv[4])
356 if len(sys.argv) == 6:
357 src_addr = sys.argv[5]
358 else:
359 src_addr = '0.0.0.0'
360
361 cli = MonitorClient([.005, .01, .025, .05, .075, .1], (host, int(port)), rate, size, sourceaddr = (src_addr, 0))
362 try:
363 display_interval = 0.5
364 start_time = time.time()
365 next_time = start_time
366 while True:
367 next_time = next_time + display_interval
368 sleeptime = next_time - time.time()
369 if sleeptime > 0:
370 time.sleep(sleeptime)
371 if 0:
372 bins = cli.get_bins()
373 else:
374 bins, average, average_restricted = cli.get_smart_bins(display_interval)
375 print "%7.3f:"%(time.time() - start_time),
376 for i in range(0,len(bins)):
377 print "%3i"%(int(100*bins[i])),
378 if i == 2:
379 print " /",
380 print "avg: %5.1f ms"%(1000*average), "avgr: %5.1f ms"%(1000*average_restricted), "loss: %6.2f %%"%(100 - 100 * sum(bins[0:-1]))
381 sys.stdout.flush()
382 finally:
383 cli.shutdown()
384 print >> sys.stderr, "Round trip latency summary (packets):"
385 for i in range(0, len(cli.latencybins)):
386 print >> sys.stderr, "%.1f ms: %i before %i after"%(cli.latencybins[i] * 1000, sum(cli.bins[0:i+1]), sum(cli.bins[i+1:]) + cli.lost)
387
388 except KeyboardInterrupt:
389 print >> sys.stderr, "Exiting on CTRL+C."
390