1
2
3 import roslib; roslib.load_manifest('network_monitor_udp')
4 from roslib import rostime
5 import rospy
6 import actionlib
7 import network_monitor_udp.msg as msgs
8 from network_monitor_udp.msg import LinktestGoal
9 import time
10 import math
11
13 """
14 Maintains a log of measurements for a variable. For each measurement, the
15 time and value of the variable is recorded. Various statistics are computed
16 for the recorded data.
17 """
18 ALPHA = 0.1
19 MOVING_STATS_SAMPLES = 10
20
22 self.reset_all()
23
25 """
26 Resets the aggregated statistics.
27 """
28 self._min = 1e1000
29 self._max = -1e1000
30 self._avg = 0.0
31 self._expavg = 0.0
32 self._count = 0
33 self._sum = 0.0
34 self._curr = 0.0
35
36 - def reset_history(self):
37 """
38 Deletes all measurements recorded so far.
39 """
40 self._history = []
41
48
49 - def record(self, measurement, meas_time = None):
50 """
51 Records a measurement.
52
53 @type measurement: float
54 @param measurement: the value being recorded
55 @type meas_time: float
56 @param meas_time: the time of the measurement (if not specified the current time
57 as retrieved by time.time() will be used)
58 """
59 if meas_time is None:
60 meas_time = time.time()
61
62 self._sum += measurement
63 self._count += 1
64 self._avg = self._sum / self._count
65 self._max = max(self._max, measurement)
66 self._min = min(self._min, measurement)
67 if self._expavg == 0.0:
68 self._expavg = measurement
69 else:
70 self._expavg = MetricLog.ALPHA * measurement + (1 - MetricLog.ALPHA) * self._expavg
71
72 self._history.append((measurement, meas_time))
73 self._curr = measurement
74
76 """
77 Returns the moving average of the recorded measurements.
78
79 @type samples: int
80 @param samples: the number of samples in the moving average
81
82 @rtype: float
83 @return: moving average
84 """
85 try:
86 return sum(rec[0] for rec in self._history[-samples:])/samples
87 except IndexError:
88 return 0.0
89
91 """
92 Returns the moving standard deviation of the recorded measurements.
93
94 @type samples: int
95 @param samples: the number of samples in the moving standard deviation
96
97 @rtype: float
98 @return: moving standard deviation
99 """
100 try:
101 return math.sqrt(sum([(x[0] - self.movavg())**2 for x in self._history[-MetricLog.MOVING_STATS_SAMPLES:]])/
102 (MetricLog.MOVING_STATS_SAMPLES - 1))
103 except IndexError:
104 return 0.0
105
107 """
108 Returns the standard deviation of the recorded measurements.
109
110 @rtype: float
111 @return: standard deviation
112 """
113 try:
114 return math.sqrt(sum([(x[0] - self._avg)**2 for x in self._history])/(self._count - 1))
115 except ZeroDivisionError:
116 return 0.0
117
119 """
120 @rtype: boolean
121 @return: True if the measurement log is empty
122 """
123 return self._count == 0
124
126 """
127 Returns the time of the first measurement recorded.
128
129 @rtype: float
130 @return: First measurement time or 0.0 if the log is empty.
131 """
132 try:
133 return self._history[0][1]
134 except IndexError:
135 return 0.0
136
138 """
139 Returns the time of the last measurement recorded.
140
141 @rtype: float
142 @return: Last measurement time or 0.0 if the log is empty.
143 """
144 try:
145 return self._history[-1][1]
146 except IndexError:
147 return 0.0
148
150 """
151 Returns the period during which measurements where made (the time
152 elapsed from the first to the last measurement).
153
154 @rtype: float
155 @return: duration in seconds
156 """
157 try:
158 return self.max_time() - self.min_time()
159 except IndexError:
160 return 0.0
161
163 """
164 @rtype: float
165 @return: minimum value in the log
166 """
167 return self._min
168
170 """
171 @rtype: float
172 @return: maximum value in the log
173 """
174 return self._max
175
177 """
178 @rtype: float
179 @return: the mean of the values in the log
180 """
181 return self._avg
182
184 """
185 @rtype: float
186 @return: the exponential average of the values in the log
187 """
188 return self._expavg
189
191 """
192 @rtype: float
193 @return: the number of values in the log
194 """
195 return self._count
196
198 """
199 @rtype: float
200 @return: the last value recorded
201 """
202 return self._curr
203
205 """
206 @rtype: [(float, float)]
207 @return: the list of measurements recorded. Each measurement is a tuple (value, measurement_time).
208 """
209 return self._history
210
212 """
213 This class implements a udpmon link test. It handles communication
214 with the udpmonsourcenode via actionlib. Feedback information
215 (bandwidth, loss, latency) is recorded into L{MetricLog} objects. The
216 class has support for pre-empting a running test.
217 """
218 - def __init__(self, name, goal, actionclient):
219 """
220 Builds a LinkTest object. The test will not be active until explicitly
221 started with the L{start} function.
222
223 @type name: string
224 @param name: test name (used for logging information)
225 @type goal: LinktestGoal
226 @param goal: the goal object describing the link test
227 @type actionclient: actionlib.ActionClient
228 @param actionclient: the action client object
229 """
230 self.name = name
231 self.goal = goal
232 self.actionclient = actionclient
233 self.custom_feedback_handler = None
234
235 self.latency = MetricLog()
236 """Latency measurement log. \n@type: L{MetricLog}"""
237 self.loss = MetricLog()
238 """Packet loss measurement log. \n@type: L{MetricLog}"""
239 self.bandwidth = MetricLog()
240 """Bandwidth measurement log. \n@type: L{MetricLog}"""
241
242 self.overall_latency = None
243 self.overall_loss = None
244 self.overall_bandwidth = None
245 self.overall_latency_histogram = None
246
247 self.started = False
248 self.done = False
249
251 """
252 Starts the test.
253 """
254 self.goal_handle = self.actionclient.send_goal(self.goal, self._handle_link_transition, self._handle_link_feedback)
255 self.started = True
256
258 """
259 Stops (preempts) a running test.
260 """
261 if self.started:
262 self.goal_handle.cancel()
263
271
289
299
301 """
302 @rtype: boolean
303 @return: True if the link is currently down (last measured bandwidth is close to zero).
304 """
305 return self.bandwidth.curr() < 0.00001
306
308 """
309 Sets a custom feedback handler.
310
311 @type custom_feedback_handler: func(GoalHandle, LinktestFeedback)
312 @param: the custom feedback handler that gets called whenever feedback is received
313 """
314 self.custom_feedback_handler = custom_feedback_handler
315
316
318 """
319 This class maintains a handle to a L{udpmonsourcenode.py} action server.
320 It is used to create link tests that have as source a particular C{udpmonsourcenode.py} node.
321 """
322
323 - def __init__(self, action_name = "performance_test"):
324 """
325 Creates an L{actionlib.ActionClient} object and waits for connection to the action server.
326
327 @raise Exception: if the connection does not succeed
328
329 @type action_name: str
330 @param action_name: action server name, by default this is C{"performance_test"}, but if the action
331 server lives in a different namespace than the full path should be given
332 (e.g. C{"/source2/performance_test"})
333 """
334 self.actionclient = actionlib.ActionClient(action_name, msgs.LinktestAction)
335 self.count = 0
336 started = self.actionclient.wait_for_server()
337 if started is False:
338 raise Exception, "could not connect to action server"
339
344
346 """
347 Creates and returns a link test. The link test will need to be started with its C{start()} function.
348
349 @type name: str
350 @param name: an optional test name. This name will be used for log mesages.
351
352 @type kwargs: dict
353 @param kwargs: a dictionary of test parameters. These are the .action file parameters. See the wiki page
354 at U{http://www.ros.org/wiki/network_monitor_udp} for a full description.
355 """
356 if name is None:
357 name = self._generate_name("bwtest")
358 goal = msgs.LinktestGoal(**kwargs)
359 test = LinkTest(name, goal, self.actionclient)
360 return test
361
363 """
364 Cancels all currently running tests. As a precaution, this function should be
365 called at the beginning of any test "session" as there may be orphan link tests
366 still being run from the C{udpmonsourcenode.py} node, if that node has not been restarted.
367 """
368 self.actionclient.cancel_all_goals()
369
370
371 time.sleep(2)
372
374 """
375 This is a convenience function that creates an adaptive bandwidth test in order to saturate
376 and determine a link's capacity. By default, the parameters of the test are
377
378 - one-way test (not roundtrip)
379 - uses ROS for return path instead of UDP
380 - update_interval: 0.2 seconds
381 - packet loss threshold: 0.5%
382 - latency threshold: 30ms
383 - initial bandwidth: 0.5Mbit/s
384 - maximum duration: 30 seconds
385
386 This function should return faster than the specified timeout if the conditions for link saturation
387 are fulfilled (i.e. it hits a bandwidth ceiling for a specified period of time).
388
389 @type kwargs: dict
390 @param kwargs: a list of test parameters
391
392 @rtype: float
393 @return: the link capacity if it could be determined or 0.0, otherwise.
394 """
395 name = self._generate_name("link_capacity_test")
396 test_params = { "update_interval" : 0.2, "ros_returnpath" : True, "bw" : 0.5*10**6,
397 "bw_type" : LinktestGoal.BW_ADAPTIVE,
398 "latency_threshold" : 30e-3, "pktloss_threshold" : 0.5, "duration": 30.0 }
399 test_params.update(kwargs)
400 goal = msgs.LinktestGoal(**test_params)
401 test = LinkTest(name, goal, self.actionclient)
402 bwlog = test.bandwidth
403
404 link_capacity = 0.0
405 test.start()
406 while not test.done:
407 time.sleep(0.5)
408 if bwlog.duration() < 5.0:
409 continue
410 if bwlog.max() < 0.0001:
411 break
412 if bwlog.curr() / bwlog.max() > 0.5 and bwlog.curr() / bwlog.expavg() < 1.0:
413 link_capacity = bwlog.expavg()
414 break
415 if not test.done:
416 test.stop()
417
418 return link_capacity
419