delay_queue.py
Go to the documentation of this file.
00001 #! /usr/bin/env python
00002 
00003 #
00004 # Software License Agreement (BSD License)
00005 #
00006 # Copyright (c) 2008, Willow Garage, Inc.
00007 # All rights reserved.
00008 #
00009 # Redistribution and use in source and binary forms, with or without
00010 # modification, are permitted provided that the following conditions
00011 # are met:
00012 #
00013 #  * Redistributions of source code must retain the above copyright
00014 #    notice, this list of conditions and the following disclaimer.
00015 #  * Redistributions in binary form must reproduce the above
00016 #    copyright notice, this list of conditions and the following
00017 #    disclaimer in the documentation and/or other materials provided
00018 #    with the distribution.
00019 #  * Neither the name of Willow Garage, Inc. nor the names of its
00020 #    contributors may be used to endorse or promote products derived
00021 #    from this software without specific prior written permission.
00022 #
00023 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00024 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00025 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00026 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00027 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00028 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00029 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00030 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00031 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00032 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00033 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00034 # POSSIBILITY OF SUCH DAMAGE.
00035 #
00036 
00037 ##\author Derek King
00038 
00039 import unittest
00040 import sys 
00041 from itertools import islice
00042 
00043 class SortedMarker:
00044     def __init__(self, key):
00045         self.key = key
00046         self.cached_index = 0
00047 
00048     def find(self, L, value):
00049         """ Returns first index in sorted list <L> where value > key(L[index]) 
00050         uses cached index to speed lookup
00051         returns length of array when index can not be found
00052         """
00053         if len(L) == 0:
00054             raise RuntimeError("input list L is empty")
00055 
00056         key = self.key
00057         index = self.cached_index
00058         if index >= len(L):
00059             index = len(L)-1
00060 
00061         if (value >= key(L[index])):
00062             while (index < len(L)) and (value >= key(L[index])):
00063                 index += 1
00064         else:
00065             while (index >= 0) and (value < key(L[index])):
00066                 index -= 1
00067             index+=1
00068 
00069         self.cached_index = index
00070         return index
00071 
00072     def findEx(self, L, value):
00073         """ Throws exception when index cannot be found"""
00074         index = self.find(L, value)
00075         if index>=len(L):
00076             raise RuntimeError("Could not find value in list > %s" % str(value))
00077         return index
00078 
00079 
00080     def slow_find(self, L, value):
00081         """ Returns smallest index in sorted list <L> where value > key(L[index]) """
00082         index = 0        
00083         while (index < len(L)) and (value >= self.key(L[index])):
00084             index += 1
00085         if index>=len(L):
00086             raise RuntimeError("Could not find value in list > %s" % str(value))
00087         return index
00088 
00089 
00090 class TestSortedMarker(unittest.TestCase):
00091     # Test that ErrorDelay class delays error messages properly
00092     def setUp(self):
00093         pass
00094 
00095     # 
00096     def run_lookup(self, marker, cached_index, L, value):
00097         try :        
00098             self.run_lookup_ex(marker, cached_index, L, value)
00099         except RuntimeError:
00100             self.fail("RuntimeError cached_index=%d, value=%f, " % (cached_index, value))
00101 
00102 
00103     def run_lookup_ex(self, marker, cached_index, L, value):
00104         marker.cached_index = cached_index
00105         index = marker.findEx(L, value)
00106         cached_val = str(L[cached_index]) if cached_index < len(L) else 'X'
00107         self.assertTrue(L[index] > value, "%d=%s,%f,%d=%f" % (cached_index,cached_val,value,index,L[index]))
00108         if index > 0:
00109             self.assertTrue(L[index-1] <= value, "%d=%s,%f,%d=%f" % (cached_index,cached_val,value,index,L[index-1]))
00110             
00111     
00112     # runs list with different cached indexes
00113     def run_list(self, marker, L):
00114         epsilon = 0.01
00115         for cached_index in range(len(L)+1):            
00116             for index in range(len(L)-1):
00117                 self.run_lookup(marker, cached_index, L, L[index]-epsilon)
00118                 if L[index] < L[-1]:
00119                     self.run_lookup(marker, cached_index, L, L[index])
00120                     self.run_lookup(marker, cached_index, L, L[index]+epsilon)
00121                 else:
00122                     self.assertRaises(RuntimeError, self.run_lookup_ex, marker, cached_index, L, L[-1])
00123                     self.assertRaises(RuntimeError, self.run_lookup_ex, marker, cached_index, L, L[-1]+epsilon)
00124 
00125     # Simple test that only inserts one new Error at a time
00126     def test_1_single(self):        
00127         self.run_list(SortedMarker(float), [0.0])
00128         
00129     def test_2_double(self):
00130         self.run_list(SortedMarker(float), [0.0,1.0])
00131 
00132     def test_3_equal(self):
00133         self.run_list(SortedMarker(float), [0.0,0.0,1.0,1.0])
00134 
00135     def test_4_equal2(self):
00136         self.run_list(SortedMarker(float), [0.0,0,0,1.0,2.0,2.0])
00137 
00138 
00139 
00140 class DelayQueue:
00141     def __init__(self, merge, key, future_delay, past_delay):
00142         self.queue = []
00143         self.key = key
00144         self.merge = merge
00145         self.end_marker = SortedMarker(key=key)
00146         self.mid_marker = SortedMarker(key=key)
00147         self.begin_marker = SortedMarker(key=key)
00148         self.future_delay = future_delay
00149         self.past_delay = past_delay
00150         self.last_mid_index = 0
00151     
00152     def process(self, input_list):
00153         #  queue  = [2, 4, 4, 5, 6, 6, 7, 10]
00154         #            |                    |
00155         #            |                    +- newest element (start)
00156         #            |
00157         #            +- oldest element (end)
00158         #
00159         if len(input_list) == 0:
00160             return []
00161                 
00162         key = self.key        
00163         queue = self.queue + input_list
00164         #print 'Q', queue
00165         # now figure out where new mid and end times are
00166         start_time = key(queue[-1])
00167         mid_time = start_time - self.future_delay
00168         #print 'start', start_time, 'mid', mid_time
00169         # find index's of min and end
00170         new_mid_index = self.mid_marker.findEx(queue,mid_time)
00171         #print 'new mid',new_mid_index, 'last mid', self.last_mid_index
00172 
00173         for index in range(self.last_mid_index, new_mid_index):
00174             current = queue[index]
00175             mid_time = key(current)
00176             start_time = mid_time + self.future_delay
00177             end_time = mid_time - self.past_delay
00178             start_index = self.end_marker.find(queue,start_time)
00179             end_index = self.end_marker.findEx(queue,end_time)
00180             future_list = islice(queue,index+1,start_index)
00181             past_list = islice(queue,end_index,index)
00182             self.merge(current,future_list,past_list)
00183             
00184         # now cut off new end of queue
00185         start_time = key(queue[-1])
00186         end_time = start_time - self.past_delay - self.future_delay
00187         end_index = self.end_marker.findEx(queue,end_time)     
00188         #print 'end time', end_time, 'end_index', end_index
00189 
00190         output_list = queue[:end_index]
00191         #print 'Q$-1', queue
00192         self.queue = queue[end_index:]
00193         #print 'queue[end_index:]', queue[end_index:]
00194         #print 'Q$', self.queue
00195         self.last_mid_index = new_mid_index-end_index
00196 
00197         return output_list
00198 
00199 
00200 def print_merge(current, future, past):
00201     print [ [i for i in past], current, [i for i in future] ]
00202 
00203 
00204 def find_sublist(mainlist, sublist):
00205     """ Returns index in mainlist where sublist first occurs, or -1 if sublist cannot be found """
00206     match_index = -1
00207     for start in range( len(mainlist)-len(sublist)+1 ):
00208         local_match = True
00209         for i in range(len(sublist)):
00210             if (mainlist[start+i]!=sublist[i]):
00211                 local_match = False
00212                 break
00213         if local_match:
00214             match_index = start
00215             break
00216     return match_index
00217 
00218 
00219 class TestDelayQueue(unittest.TestCase):
00220     # Test that ErrorDelay class delays error messages properly
00221     def setUp(self):
00222         self.merge_list = []
00223     
00224     # 
00225     def run_lookup(self, marker, cached_index, L, value):
00226         try :        
00227             self.run_lookup_ex(marker, cached_index, L, value)
00228         except RuntimeError:
00229             self.fail("RuntimeError cached_index=%d, value=%f, " % (cached_index, value))
00230 
00231 
00232     def run_lookup_ex(self, marker, cached_index, L, value):
00233         marker.cached_index = cached_index
00234         index = marker.find(L, value)
00235         cached_val = str(L[cached_index]) if cached_index < len(L) else 'X'
00236         self.assertTrue(L[index] > value, "%d=%s,%f,%d=%f" % (cached_index,cached_val,value,index,L[index]))
00237         if index > 0:
00238             self.assertTrue(L[index-1] <= value, "%d=%s,%f,%d=%f" % (cached_index,cached_val,value,index,L[index-1]))
00239             
00240     
00241     # runs list with different cached indexes
00242     def run_list(self, marker, L):
00243         epsilon = 0.01
00244         for cached_index in range(len(L)+1):            
00245             for index in range(len(L)-1):
00246                 self.run_lookup(marker, cached_index, L, L[index]-epsilon)
00247                 if L[index] < L[-1]:
00248                     self.run_lookup(marker, cached_index, L, L[index])
00249                     self.run_lookup(marker, cached_index, L, L[index]+epsilon)
00250                 else:
00251                     self.assertRaises(RuntimeError, self.run_lookup_ex, marker, cached_index, L, L[-1])
00252                     self.assertRaises(RuntimeError, self.run_lookup_ex, marker, cached_index, L, L[-1]+epsilon)
00253 
00254     def merge(self, current, future, past):
00255         self.merge_list.append(current)
00256         # Make sure past+current+future is actual sequence in sublist
00257         L = [i for i in past] + [current] + [i for i in future]
00258         index = find_sublist(self.input, L)
00259         self.assertTrue(index!=-1, "can't find %s in input %s" %(str(L), str(self.input)))
00260 
00261     def verify_results(self, output):
00262         self.assertTrue(output == self.input, "out=%s, in=%s" % (str(output), str(self.input)))
00263         self.assertTrue(self.merge_list == self.input, "merge=%s, in=%s" % (str(self.merge_list), str(self.input)))
00264     
00265     # Simple test that only inserts one new element at a time
00266     def test_1(self):
00267         dq = DelayQueue(self.merge,int,2,5)
00268         output = []
00269         self.input = [i for i in range(10)]
00270         for i in self.input:
00271             output += dq.process([i])
00272         output += dq.process([1000]) # flush
00273         self.verify_results(output)
00274 
00275     # Put in all elements at a time
00276     def test_2(self):
00277         dq = DelayQueue(self.merge,int,2,5)
00278         output = []
00279         self.input = [i for i in range(10)]
00280         output += dq.process(self.input)
00281         output += dq.process([1000]) # flush
00282         self.verify_results(output)
00283 
00284     # Try test with matching elements
00285     def test_3(self):
00286         dq = DelayQueue(self.merge,int,2,5)
00287         output = []
00288         self.input = [0,1,2,3,4,5,5,5,6,7,8,9]
00289         output += dq.process(self.input)
00290         output += dq.process([1000]) # flush
00291         self.verify_results(output)
00292 
00293 
00294 
00295 if __name__ == '__main__':
00296     unittest.main()


mtrace_tools
Author(s): Derek
autogenerated on Sat Dec 28 2013 17:58:07