00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 import unittest
00040 import sys
00041 from itertools import islice
00042
00043 class SortedMarker:
00044 def __init__(self, key):
00045 self.key = key
00046 self.cached_index = 0
00047
00048 def find(self, L, value):
00049 """ Returns first index in sorted list <L> where value > key(L[index])
00050 uses cached index to speed lookup
00051 returns length of array when index can not be found
00052 """
00053 if len(L) == 0:
00054 raise RuntimeError("input list L is empty")
00055
00056 key = self.key
00057 index = self.cached_index
00058 if index >= len(L):
00059 index = len(L)-1
00060
00061 if (value >= key(L[index])):
00062 while (index < len(L)) and (value >= key(L[index])):
00063 index += 1
00064 else:
00065 while (index >= 0) and (value < key(L[index])):
00066 index -= 1
00067 index+=1
00068
00069 self.cached_index = index
00070 return index
00071
00072 def findEx(self, L, value):
00073 """ Throws exception when index cannot be found"""
00074 index = self.find(L, value)
00075 if index>=len(L):
00076 raise RuntimeError("Could not find value in list > %s" % str(value))
00077 return index
00078
00079
00080 def slow_find(self, L, value):
00081 """ Returns smallest index in sorted list <L> where value > key(L[index]) """
00082 index = 0
00083 while (index < len(L)) and (value >= self.key(L[index])):
00084 index += 1
00085 if index>=len(L):
00086 raise RuntimeError("Could not find value in list > %s" % str(value))
00087 return index
00088
00089
00090 class TestSortedMarker(unittest.TestCase):
00091
00092 def setUp(self):
00093 pass
00094
00095
00096 def run_lookup(self, marker, cached_index, L, value):
00097 try :
00098 self.run_lookup_ex(marker, cached_index, L, value)
00099 except RuntimeError:
00100 self.fail("RuntimeError cached_index=%d, value=%f, " % (cached_index, value))
00101
00102
00103 def run_lookup_ex(self, marker, cached_index, L, value):
00104 marker.cached_index = cached_index
00105 index = marker.findEx(L, value)
00106 cached_val = str(L[cached_index]) if cached_index < len(L) else 'X'
00107 self.assertTrue(L[index] > value, "%d=%s,%f,%d=%f" % (cached_index,cached_val,value,index,L[index]))
00108 if index > 0:
00109 self.assertTrue(L[index-1] <= value, "%d=%s,%f,%d=%f" % (cached_index,cached_val,value,index,L[index-1]))
00110
00111
00112
00113 def run_list(self, marker, L):
00114 epsilon = 0.01
00115 for cached_index in range(len(L)+1):
00116 for index in range(len(L)-1):
00117 self.run_lookup(marker, cached_index, L, L[index]-epsilon)
00118 if L[index] < L[-1]:
00119 self.run_lookup(marker, cached_index, L, L[index])
00120 self.run_lookup(marker, cached_index, L, L[index]+epsilon)
00121 else:
00122 self.assertRaises(RuntimeError, self.run_lookup_ex, marker, cached_index, L, L[-1])
00123 self.assertRaises(RuntimeError, self.run_lookup_ex, marker, cached_index, L, L[-1]+epsilon)
00124
00125
00126 def test_1_single(self):
00127 self.run_list(SortedMarker(float), [0.0])
00128
00129 def test_2_double(self):
00130 self.run_list(SortedMarker(float), [0.0,1.0])
00131
00132 def test_3_equal(self):
00133 self.run_list(SortedMarker(float), [0.0,0.0,1.0,1.0])
00134
00135 def test_4_equal2(self):
00136 self.run_list(SortedMarker(float), [0.0,0,0,1.0,2.0,2.0])
00137
00138
00139
00140 class DelayQueue:
00141 def __init__(self, merge, key, future_delay, past_delay):
00142 self.queue = []
00143 self.key = key
00144 self.merge = merge
00145 self.end_marker = SortedMarker(key=key)
00146 self.mid_marker = SortedMarker(key=key)
00147 self.begin_marker = SortedMarker(key=key)
00148 self.future_delay = future_delay
00149 self.past_delay = past_delay
00150 self.last_mid_index = 0
00151
00152 def process(self, input_list):
00153
00154
00155
00156
00157
00158
00159 if len(input_list) == 0:
00160 return []
00161
00162 key = self.key
00163 queue = self.queue + input_list
00164
00165
00166 start_time = key(queue[-1])
00167 mid_time = start_time - self.future_delay
00168
00169
00170 new_mid_index = self.mid_marker.findEx(queue,mid_time)
00171
00172
00173 for index in range(self.last_mid_index, new_mid_index):
00174 current = queue[index]
00175 mid_time = key(current)
00176 start_time = mid_time + self.future_delay
00177 end_time = mid_time - self.past_delay
00178 start_index = self.end_marker.find(queue,start_time)
00179 end_index = self.end_marker.findEx(queue,end_time)
00180 future_list = islice(queue,index+1,start_index)
00181 past_list = islice(queue,end_index,index)
00182 self.merge(current,future_list,past_list)
00183
00184
00185 start_time = key(queue[-1])
00186 end_time = start_time - self.past_delay - self.future_delay
00187 end_index = self.end_marker.findEx(queue,end_time)
00188
00189
00190 output_list = queue[:end_index]
00191
00192 self.queue = queue[end_index:]
00193
00194
00195 self.last_mid_index = new_mid_index-end_index
00196
00197 return output_list
00198
00199
00200 def print_merge(current, future, past):
00201 print [ [i for i in past], current, [i for i in future] ]
00202
00203
00204 def find_sublist(mainlist, sublist):
00205 """ Returns index in mainlist where sublist first occurs, or -1 if sublist cannot be found """
00206 match_index = -1
00207 for start in range( len(mainlist)-len(sublist)+1 ):
00208 local_match = True
00209 for i in range(len(sublist)):
00210 if (mainlist[start+i]!=sublist[i]):
00211 local_match = False
00212 break
00213 if local_match:
00214 match_index = start
00215 break
00216 return match_index
00217
00218
00219 class TestDelayQueue(unittest.TestCase):
00220
00221 def setUp(self):
00222 self.merge_list = []
00223
00224
00225 def run_lookup(self, marker, cached_index, L, value):
00226 try :
00227 self.run_lookup_ex(marker, cached_index, L, value)
00228 except RuntimeError:
00229 self.fail("RuntimeError cached_index=%d, value=%f, " % (cached_index, value))
00230
00231
00232 def run_lookup_ex(self, marker, cached_index, L, value):
00233 marker.cached_index = cached_index
00234 index = marker.find(L, value)
00235 cached_val = str(L[cached_index]) if cached_index < len(L) else 'X'
00236 self.assertTrue(L[index] > value, "%d=%s,%f,%d=%f" % (cached_index,cached_val,value,index,L[index]))
00237 if index > 0:
00238 self.assertTrue(L[index-1] <= value, "%d=%s,%f,%d=%f" % (cached_index,cached_val,value,index,L[index-1]))
00239
00240
00241
00242 def run_list(self, marker, L):
00243 epsilon = 0.01
00244 for cached_index in range(len(L)+1):
00245 for index in range(len(L)-1):
00246 self.run_lookup(marker, cached_index, L, L[index]-epsilon)
00247 if L[index] < L[-1]:
00248 self.run_lookup(marker, cached_index, L, L[index])
00249 self.run_lookup(marker, cached_index, L, L[index]+epsilon)
00250 else:
00251 self.assertRaises(RuntimeError, self.run_lookup_ex, marker, cached_index, L, L[-1])
00252 self.assertRaises(RuntimeError, self.run_lookup_ex, marker, cached_index, L, L[-1]+epsilon)
00253
00254 def merge(self, current, future, past):
00255 self.merge_list.append(current)
00256
00257 L = [i for i in past] + [current] + [i for i in future]
00258 index = find_sublist(self.input, L)
00259 self.assertTrue(index!=-1, "can't find %s in input %s" %(str(L), str(self.input)))
00260
00261 def verify_results(self, output):
00262 self.assertTrue(output == self.input, "out=%s, in=%s" % (str(output), str(self.input)))
00263 self.assertTrue(self.merge_list == self.input, "merge=%s, in=%s" % (str(self.merge_list), str(self.input)))
00264
00265
00266 def test_1(self):
00267 dq = DelayQueue(self.merge,int,2,5)
00268 output = []
00269 self.input = [i for i in range(10)]
00270 for i in self.input:
00271 output += dq.process([i])
00272 output += dq.process([1000])
00273 self.verify_results(output)
00274
00275
00276 def test_2(self):
00277 dq = DelayQueue(self.merge,int,2,5)
00278 output = []
00279 self.input = [i for i in range(10)]
00280 output += dq.process(self.input)
00281 output += dq.process([1000])
00282 self.verify_results(output)
00283
00284
00285 def test_3(self):
00286 dq = DelayQueue(self.merge,int,2,5)
00287 output = []
00288 self.input = [0,1,2,3,4,5,5,5,6,7,8,9]
00289 output += dq.process(self.input)
00290 output += dq.process([1000])
00291 self.verify_results(output)
00292
00293
00294
00295 if __name__ == '__main__':
00296 unittest.main()