periodic.py
Go to the documentation of this file.
1 #!/bin/env python
2 
3 #**********************************************************************
4 # Copyright (c) 2013 InterWorking Labs, All Rights Reserved *
5 # *
6 # RESTRICTED RIGHTS LEGEND *
7 # *
8 # This software is provided with RESTRICTED RIGHTS. *
9 # *
10 # Use, duplication, or disclosure by the U.S. Government is subject *
11 # to restrictions set forth in subparagraph (c)(1)(ii) of the Rights *
12 # in Technical Data and Computer Software clause at DFARS *
13 # 252.227-7013 or subparagraphs (c)(1) and (2) of the Commercial *
14 # Computer Software - Restricted Rights at 48 CFR 52.227-19, as *
15 # applicable. The "Manufacturer" for purposes of these regulations *
16 # is InterWorking Labs, PO Box 66190, Scotts Valley, California 95060 *
17 # U.S.A. *
18 #**********************************************************************
19 
20 #**********************************************************************
21 # $Id: periodic.py 462 2013-11-17 08:50:51Z karl $
22 #**********************************************************************
23 
24 # http://stackoverflow.com/questions/2398661/schedule-a-repeating-event-in-python-3
25 
26 import argparse
27 import datetime
28 import sys
29 import syslog
30 import threading
31 import time
32 import traceback
33 
34 import mm2client
35 import setfilters
36 
37 #**********************************************************************
38 # SetupJobs()
39 #
40 # Here is where we set up the jobs to be done and the schedule to
41 # do them.
42 #**********************************************************************
43 def SetupJobs(Jobs, mm_hostname):
44 
45  #******************************
46  # Do not change these...
47  LANA_to_LANB = True
48  LANB_to_LANA = False
49  #******************************
50 
51  BandSettings = mm2client.Bands()
52 
53  # A Band is for our 100ms/1megabit path
54  # B band for the 1000ms/1megabit path
55  A_Band = 1
56  B_Band = 2
57 
58  #**************************************************
59  # Here we set rate limitation value that we will use
60  #**************************************************
61  OneMegabit = 1000000 # Some might argue that one megabit is 1048576 (2**20)
62  OneKilobit = 100000 # Some might argue that one kilobit is 1024000
63 
64  #**************************************************
65  # Here we set the two delay values.
66  # Note: These are round trip times.
67  #**************************************************
68  LongRoundtrip = 1000
69  ShortRoundtrip = 100
70 
71  # We will divide the round trip into two equal one-way parts.
72  HalfLongRoundtrip = LongRoundtrip/2
73  HalfShortRoundtrip = ShortRoundtrip/2
74 
75  #**************************************************
76  # Set up our classification filters.
77  # We will be using these as a two-way switch to
78  # send selected packets first into the A band and then
79  # into the B band and then back to the A band etc.
80  # We will let any other forms of traffic flow via
81  # Band 5 which will get no impairments.
82  # There things that don't get picked up by the
83  # filters below are almost certainly low traffic
84  # things like OSPF.
85  #**************************************************
86  Filt_ToBandA = [setfilters.FiltSetting("arp", A_Band),
87  setfilters.FiltSetting("ipv4", A_Band),
88  setfilters.FiltSetting("ipv6", A_Band)
89  ]
90 
91  Filt_ToBandB = [setfilters.FiltSetting("arp", B_Band),
92  setfilters.FiltSetting("ipv4", B_Band),
93  setfilters.FiltSetting("ipv6", B_Band)
94  ]
95 
96  # Define A_Band to have 100ms round trip, 1megabit/second rate limit
97  BandSettings.SetDelayAmount(A_Band, LANA_to_LANB, HalfShortRoundtrip)
98  BandSettings.SetDelayAmount(A_Band, LANB_to_LANA, HalfShortRoundtrip)
99  BandSettings.SetDelayReorder(A_Band, LANA_to_LANB, False)
100  BandSettings.SetDelayReorder(A_Band, LANB_to_LANA, False)
101  BandSettings.SetRateLimit(A_Band, LANA_to_LANB, OneMegabit)
102  BandSettings.SetRateLimit(A_Band, LANB_to_LANA, OneMegabit)
103 
104  # Define B_Band with 1000ms round trip, 1kilobit/second rate limit
105  BandSettings.SetDelayAmount(B_Band, LANA_to_LANB, HalfLongRoundtrip)
106  BandSettings.SetDelayAmount(B_Band, LANB_to_LANA, HalfLongRoundtrip)
107  BandSettings.SetDelayReorder(B_Band, LANA_to_LANB, False)
108  BandSettings.SetDelayReorder(B_Band, LANB_to_LANA, False)
109  BandSettings.SetRateLimit(B_Band, LANA_to_LANB, OneKilobit)
110  BandSettings.SetRateLimit(B_Band, LANB_to_LANA, OneKilobit)
111 
112  #**************************************************
113  # Create an initial job at start-time zero.
114  # Here is where we establish our baseline.
115  # Note that this baseline is a different thing than
116  # the defaults, which are simply the absence of
117  # filters and impairments.
118  #**************************************************
119  Jobs.AddRequest("Establish Baseline - 100ms RT delay, 1,000,000 bit/sec rate limit", mm_hostname, 0,
120  BandSettings, Filt_ToBandA, Filt_ToBandA)
121 
122  #**************************************************
123  # Set up the repeating schedule of jobs
124  #**************************************************
125  interval = 60 # Seconds per item
126  items_per_group = 2
127  num_groups = 720 # Number of groups ==> 24 hours
128 
129  #Remember range() stops at < stop value not <=
130  for secs in range(interval, items_per_group*num_groups*interval, items_per_group*interval):
131  Jobs.AddRequest("Job %u: 1000ms RT delay, 100,000 bit/sec rate limit" % \
132  secs, mm_hostname, secs, None, Filt_ToBandB, Filt_ToBandB)
133  nxtsecs = secs + interval
134  Jobs.AddRequest("Job %u: 100ms RT delay, 1,000,000 bit/sec rate limit" % \
135  nxtsecs, mm_hostname, nxtsecs, None, Filt_ToBandA, Filt_ToBandA)
136  #Jobs.PrintMe()
137 
138 #**********************************************************************
139 # ShowMessage()
140 #**********************************************************************
141 UseSyslog = False # Set to True to send things to the syslog
142 TeamName = "TEAM UNKNOWN" # Should be set to the actual team name
143 
144 def ShowMessage(*args):
145  datetime.datetime.now().isoformat()
146  msg = datetime.datetime.now().isoformat() \
147  + ' "' + TeamName + '" ' \
148  + " ".join(args)
149  print msg
150  if UseSyslog:
151  syslog.syslog(syslog.LOG_INFO, msg)
152 
153 #**********************************************************************
154 # RepeatedTimer
155 #
156 # Features:
157 # - Standard library only, no external dependencies.
158 # - Start() and stop() are safe to call multiple times even if the
159 # timer has already started/stopped.
160 # - Function to be called can have positional and named arguments.
161 # - You can change interval anytime, it will be effective after
162 # next run. Same for args, kwargs and even the function.
163 #**********************************************************************
164 class RepeatedTimer(object):
165  def __init__(self, interval, function, *args, **kwargs):
166  self._timer = None
167  self.function = function
168  self.interval = interval
169  self.args = args
170  self.kwargs = kwargs
171  self.is_running = False
172  self.start()
173 
174  def _run(self):
175  self.is_running = False
176  self.start()
177  self.function(*self.args, **self.kwargs)
178 
179  def start(self):
180  if not self.is_running:
181  self._timer = threading.Timer(self.interval, self._run)
182  self._timer.start()
183  self.is_running = True
184 
185  def stop(self):
186  self._timer.cancel()
187  self.is_running = False
188 
189 
190 #**********************************************************************
191 # DoRequest
192 #**********************************************************************
193 class DoRequest(object):
194  def __init__(self, reqname, mm2host, dowhen, bands, a2b_filtmap, b2a_filtmap):
195  """
196  @param reqname: The name of the request, a single line of text.
197  @type reqname: String
198  @param mm2host: The name or IP address of the target host.
199  @type mm2host: String
200  @param dowhen: A datetime object indicating when to do perform this request.
201  @type dowhen: datetime
202  @param bands: A mm2client.Bands object containing the desired impairment settings.
203  May be None.
204  @type bands: mm2client.Bands
205  @param a2b_filtmap: A sequence of setfilters.FiltSetting objects containing the desired LanA to LanB filters settings.
206  May be an empty sequence.
207  @type a2b_filtmap: A sequence of setfilters.FiltSetting objects.
208  @param b2a_filtmap: A sequence of setfilters.FiltSetting objects containing the desired LanB to LanA filters settings.
209  May be an empty sequence.
210  @type b2a_filtmap: A sequence of setfilters.FiltSetting objects.
211  """
212  self.__Name = reqname
213  self.__MM2HostName = mm2host
214  self.__DoWhen = dowhen
215  self.__Bands = bands
216  self.__A2BFiltmap = a2b_filtmap
217  self.__B2AFiltmap = b2a_filtmap
218  self.__Done = False
219 
220  def __lt__(self, other):
221  return self.__DoWhen < other.__DoWhen
222 
223  def __le__(self, other):
224  return self.__DoWhen <= other.__DoWhen
225 
226  def __eq__(self, other):
227  return self.__DoWhen == other.__DoWhen
228 
229  def __ne__(self, other):
230  return self.__DoWhen != other.__DoWhen
231 
232  def __gt__(self, other):
233  return self.__DoWhen > other.__DoWhen
234 
235  def __ge__(self, other):
236  return self.__DoWhen <= other.__DoWhen
237 
238  def __repr__(self):
239  return repr({
240  "Name": self.__Name,
241  "MM2HostName": self.__MM2HostName,
242  "DoWhen": self.__DoWhen,
243  "Bands": repr(self.__Bands),
244  "A2BFiltMap": repr(self.__A2BFiltmap),
245  "B2AFiltMap": repr(self.__B2AFiltmap),
246  "Done": self.__Done
247  })
248 
249  def __str__(self):
250  return self.__repr__()
251 
252  @property
253  def Name(self):
254  return self.__Name
255 
256  @property
257  def MM2HostName(self):
258  return self.__MM2HostName
259 
260  @property
261  def DoWhen(self):
262  return self.__DoWhen
263 
264  @property
265  def Bands(self):
266  return self.__Bands
267 
268  @property
269  def A2BFiltmap(self):
270  return self.__A2BFiltmap
271 
272  @property
273  def B2AFiltmap(self):
274  return self.__B2AFiltmap
275 
276  @property
277  def IsDone(self):
278  return self.__Done
279 
280  def IsReadyToGo(self, now):
281  return (not self.__Done) and (self.__DoWhen <= now)
282 
283  def Run(self, now):
284  global AllFilterNames
285  assert self.IsReadyToGo(now)
286  self.__Done = True
287  SetMM(self.__MM2HostName, self.__Bands,
288  self.__A2BFiltmap, self.__B2AFiltmap, AllFilterNames)
289 
290 #**********************************************************************
291 # RunList
292 #**********************************************************************
293 class RunList(object):
294  def __init__(self, now, skipsecs):
295  self.__WorkList = []
296  self.__StartTime = now
297  if (skipsecs is not None) and (skipsecs > 0):
298  self.__StartTime = self.__StartTime - datetime.timedelta(0, skipsecs)
299 
300  def AddRequest(self, reqname, mm2host, startsec, bands,
301  a2b_filtmap, b2a_filtmap):
302  dowhen = self.__StartTime + datetime.timedelta(0, startsec)
303  self.__WorkList.append(DoRequest(reqname, mm2host, dowhen, bands,
304  a2b_filtmap, b2a_filtmap))
305  self.__WorkList.sort()
306 
307  def RunNextRequest(self, nowtime):
308  # timedelta = nowtime - self.__StartTime
309  # We are going to do a sequential scan of the work list to
310  # find the first one that has not already been done and
311  # that has reached its start time.
312  # This check happens about once a second.
313  for ndx in range(len(self.__WorkList)):
314  req = self.__WorkList[ndx]
315  if req.IsReadyToGo(nowtime):
316  ShowMessage("Starting task \"%s\"" % req.Name)
317  try:
318  req.Run(nowtime)
319  except Exception, err:
320  ShowMessage("Exception in Task \"%s\" - %s" % (req.Name, str(err)))
321  del self.__WorkList[ndx]
322  return True
323  return False
324 
325  @property
327  if len(self.__WorkList) == 0:
328  return datetime.datetime.now()
329  return self.__WorkList[-1].DoWhen
330 
331  def PrintMe(self):
332  for d in self.__WorkList:
333  ShowMessage("RunRequest", str(d))
334 
335 #**********************************************************************
336 # SetMM()
337 #**********************************************************************
338 def SetMM(mm_hostname, bands, a2bfilt_vals, b2afilt_vals, all_filter_names):
339 
340  if bands is not None:
341  mm2client.ChangeBandsOnMM(bands, mm_hostname)
342  if (a2bfilt_vals is not None) or (b2afilt_vals is not None):
343  setfilters.SetFiltMap(mm_hostname, a2bfilt_vals, b2afilt_vals,
344  all_filter_names)
345 
346 #**********************************************************************
347 # We will try to avoid redundant queries to fetch filter names by
348 # caching it.
349 # It is simply a list of strings.
350 # A value of None means that it will be fetched each time rather than
351 # cached.
352 #**********************************************************************
353 AllFilterNames = None
354 
355 #**********************************************************************
356 # main()
357 #**********************************************************************
358 def main():
359  global UseSyslog
360  global AllFilterNames
361  global TeamName
362 
363  #******************************
364  # jobtick() - Where we run the job
365  #******************************
366  def jobtick():
367  Jobs.RunNextRequest(datetime.datetime.now())
368 
369  #******************************
370  # Deal with the arguments
371  #******************************
372  parser = argparse.ArgumentParser(description="Start impairment pattern.")
373 
374  parser.add_argument("-s", "--skip", type=int,
375  help="Skip ahead: start at N seconds.")
376 
377  parser.add_argument("-C", "--do_not_use_initial_defaults", action="store_true",
378  dest="no_initial_defaults",
379  help="Do not establish defaults before running scheduled tasks.")
380 
381  parser.add_argument("-D", "--initial_defaults_then_exit", action="store_true",
382  dest="initial_defaults_only",
383  help="Establish defaults and then exit, supersedes -C.")
384 
385  parser.add_argument("-l", "--loop", action="store_true",
386  help="Loop/Cycle forever.")
387 
388  parser.add_argument("-S", "--syslog", action="store_true",
389  help="Send reports to the system defined syslog server (level LOG_INFO).")
390 
391  parser.add_argument("-T", "--team", required=True, metavar="<team name>",
392  help="Team name.")
393 
394  parser.add_argument("mm_hostname", metavar="<Mini Maxwell host name or IP address>",
395  help='The host name or IP address of the Mini Maxwell/Maxwell G.')
396 
397  pargs = parser.parse_args()
398 
399  TeamName = pargs.team
400  if pargs.syslog:
401  UseSyslog = True
402 
403  mm_hostname = pargs.mm_hostname
404 
405  if AllFilterNames is None:
406  AllFilterNames = setfilters.GetAllFilterNames(mm_hostname)
407 
408  #**************************************************
409  # Do we just set the defaults and then exit?
410  #**************************************************
411  if pargs.initial_defaults_only:
412  # Coerce default filter settings and impairments to a known (default) state
413  ShowMessage("Coercing filters and impairments to default state, then exiting")
414  SetMM(mm_hostname, mm2client.Bands(), [], [], AllFilterNames)
415  ShowMessage("Stopping - Complete")
416  sys.exit()
417 
418  #**************************************************
419  # Set default values (unless suppressed)
420  #**************************************************
421  if pargs.no_initial_defaults:
422  ShowMessage("Not setting initial defaults")
423  else:
424  # Coerce default filter settings and impairments to a known (default) state
425  ShowMessage("Coercing filters and impairments to default state")
426  SetMM(mm_hostname, mm2client.Bands(), [], [], AllFilterNames)
427 
428  skipsecs = pargs.skip
429 
430  #**************************************************
431  # Main scheduling loop
432  # Note: This loop does not repeat setting of the
433  # default values. But it does repeat the initial
434  # (time zero) job.
435  # Also note that any skipping of initial time is
436  # done only on the first iteration of this loop.
437  # When looping the first job of the new iteration
438  # will start immediately.
439  #**************************************************
440  while True:
441  StartTime = datetime.datetime.now()
442 
443  Jobs = RunList(StartTime, skipsecs)
444  skipsecs = 0 # We only do the skip on the first cycle
445 
446  SetupJobs(Jobs, mm_hostname)
447 
448  # Come up to date with initial and skipped tasks.
449  ShowMessage("Running initial and skipped tasks to establish state.")
450  while Jobs.RunNextRequest(StartTime):
451  pass # Yes, we want to pass, all the work is done in the RunNextRequest()
452 
453  ShowMessage("Beginning scheduled events")
454 
455  # Give ourselves a lifetime.
456  # Add a few seconds to let the last job complete.
457  sleepsecs = (Jobs.LastRequestStartTime - datetime.datetime.now()).total_seconds() + 5
458 
459  # Begin the timer system with a one second interval
460  rt = RepeatedTimer(1, jobtick)
461  try:
462  ShowMessage("Start running for %u seconds." % sleepsecs)
463  time.sleep(sleepsecs)
464  finally:
465  rt.stop() # better in a try/finally block to make sure the program ends!
466 
467  if not pargs.loop:
468  break
469 
470  ShowMessage("Beginning again")
471 
472  ShowMessage("Stopping - Finished")
473 
474 #**********************************************************************
475 # Entry
476 #**********************************************************************
477 if __name__ == "__main__":
478 
479  try:
480  main()
481  except Exception, err:
482  ShowMessage("Unhandled exception:", str(err))
483  traceback.print_exc()
484 
485  except KeyboardInterrupt:
486  ShowMessage("Keyboard Interruptception")
def A2BFiltmap(self)
Definition: periodic.py:269
def LastRequestStartTime(self)
Definition: periodic.py:326
def Bands(self)
Definition: periodic.py:265
def __lt__(self, other)
Definition: periodic.py:220
def IsDone(self)
Definition: periodic.py:277
def GetAllFilterNames(mm_hostname)
Definition: setfilters.py:34
def Run(self, now)
Definition: periodic.py:283
def IsReadyToGo(self, now)
Definition: periodic.py:280
def Name(self)
Definition: periodic.py:253
def main()
Definition: periodic.py:358
def SetupJobs(Jobs, mm_hostname)
Definition: periodic.py:43
def __gt__(self, other)
Definition: periodic.py:232
def __init__(self, reqname, mm2host, dowhen, bands, a2b_filtmap, b2a_filtmap)
Definition: periodic.py:194
def __repr__(self)
Definition: periodic.py:238
def AddRequest(self, reqname, mm2host, startsec, bands, a2b_filtmap, b2a_filtmap)
Definition: periodic.py:301
def RunNextRequest(self, nowtime)
Definition: periodic.py:307
def __init__(self, interval, function, args, kwargs)
Definition: periodic.py:165
def B2AFiltmap(self)
Definition: periodic.py:273
def __ne__(self, other)
Definition: periodic.py:229
def __ge__(self, other)
Definition: periodic.py:235
def DoWhen(self)
Definition: periodic.py:261
def ChangeBandsOnMM(bandsobj, mm2host)
Definition: mm2client.py:479
def __le__(self, other)
Definition: periodic.py:223
def MM2HostName(self)
Definition: periodic.py:257
def __init__(self, now, skipsecs)
Definition: periodic.py:294
def __str__(self)
Definition: periodic.py:249
def __eq__(self, other)
Definition: periodic.py:226
def SetMM(mm_hostname, bands, a2bfilt_vals, b2afilt_vals, all_filter_names)
Definition: periodic.py:338
def SetFiltMap(mm2host, a2bfilt_vals, b2afilt_vals, all_filter_names=None)
Definition: setfilters.py:69
def ShowMessage(args)
Definition: periodic.py:144
def PrintMe(self)
Definition: periodic.py:331


mini_maxwell
Author(s): Yusuke Furuta
autogenerated on Wed Jul 10 2019 03:47:09