circular_buffer.py
Go to the documentation of this file.
00001 #
00002 # Copyright (c) 2009, 2011 Georgia Tech Research Corporation
00003 # All rights reserved.
00004 #
00005 # Redistribution and use in source and binary forms, with or without
00006 # modification, are permitted provided that the following conditions are met:
00007 #     * Redistributions of source code must retain the above copyright
00008 #       notice, this list of conditions and the following disclaimer.
00009 #     * Redistributions in binary form must reproduce the above copyright
00010 #       notice, this list of conditions and the following disclaimer in the
00011 #       documentation and/or other materials provided with the distribution.
00012 #     * Neither the name of the Georgia Tech Research Corporation nor the
00013 #       names of its contributors may be used to endorse or promote products
00014 #       derived from this software without specific prior written permission.
00015 #
00016 # THIS SOFTWARE IS PROVIDED BY GEORGIA TECH RESEARCH CORPORATION ''AS IS'' AND
00017 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00018 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00019 # DISCLAIMED. IN NO EVENT SHALL GEORGIA TECH BE LIABLE FOR ANY DIRECT, INDIRECT,
00020 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00021 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
00022 # OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00023 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
00024 # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
00025 # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026 #
00027 
00028 #  \author Advait Jain and Charlie Kemp (Healthcare Robotics Lab, Georgia Tech.)
00029 
00030 
00031 import numpy as np
00032 import copy
00033 
00034 
00035 class CircularBuffer():
00036     # element_shape = () if you want a buffer of scalars.
00037     def __init__(self, max_size, element_shape):
00038         shp = [max_size] + list(element_shape)
00039         self.buf = np.zeros(shp)
00040         self.size = max_size
00041         self.n_vals = 0
00042         self.end_idx = -1 # index with the up to date data.
00043         self.start_idx = 0 # index with the up to date data.
00044 
00045     def append(self, val):
00046         self.end_idx = (self.end_idx + 1) % self.size
00047         if self.n_vals != 0 and self.end_idx == self.start_idx:
00048             self.start_idx = (self.start_idx + 1) % self.size
00049 
00050         self.n_vals = min(self.n_vals + 1, self.size)
00051         self.buf[self.end_idx] = val
00052 
00053     # remove item from the right
00054     def pop(self):
00055         if self.n_vals == 0:
00056             raise IndexError('pop from empty buffer')
00057         v = self.buf[self.end_idx]
00058         self.end_idx = (self.end_idx - 1) % self.size
00059         self.n_vals = self.n_vals - 1
00060         return v
00061 
00062     # convert the data into a list
00063     def to_list(self):
00064         return self.get_array().tolist()
00065     
00066     # convert circular buffer to an array.
00067     # returns np array of len self.n_vals
00068     def get_array(self):
00069         start_idx = self.start_idx
00070         end_idx = self.end_idx
00071         if self.n_vals == 0:
00072             return np.array([])
00073         if end_idx >= start_idx:
00074             arr = copy.copy(self.buf[start_idx:end_idx+1])
00075         else:
00076             arr1 = self.buf[start_idx:]
00077             arr2 = self.buf[:end_idx+1]
00078             arr = np.concatenate((arr1, arr2))
00079         return arr
00080 
00081     # get the last n elements.
00082     def get_last(self, n):
00083         if n > self.n_vals:
00084             raise IndexError('asking for too many elements')
00085         end_idx = self.end_idx
00086         start_idx = end_idx - (n-1)
00087         if start_idx < 0:
00088             a1 = self.buf[start_idx:]
00089             a2 = self.buf[:end_idx+1]
00090             a = np.concatenate((a1,a2))
00091         else:
00092             a = self.buf[start_idx:end_idx+1]
00093         return a
00094 
00095     def clear(self):
00096         self.n_vals = 0
00097         self.end_idx = -1
00098         self.start_idx = 0
00099 
00100     def convert_index(self, i):
00101         return (self.start_idx + i) % self.n_vals
00102 
00103     def check_index(self, i):
00104         if i >= self.n_vals or -i > self.n_vals:
00105             raise IndexError('index (i = %d) out of bounds, since n_vals = %d' % (i, self.n_vals))
00106 
00107     def __setitem__(self, i, val):
00108         self.check_index(i)
00109         i = self.convert_index(i)
00110         self.buf[i] = val
00111 
00112     def __getitem__(self, i):
00113         #print 'input argument for __getitem__ =', i
00114         if type(i) is type(slice(1)):
00115             start = i.start
00116             stop = i.stop
00117             step = i.step
00118             
00119             # clip the values of the slice to be within range
00120             if start is None:
00121                 start = 0
00122             if stop is None:
00123                 stop = self.n_vals
00124             if step is None:
00125                 step = 1
00126             if start < 0:
00127                 start = 0
00128             if stop > self.n_vals:
00129                 stop = self.n_vals
00130             if start > stop:
00131                 start = stop
00132 
00133             # convert the values to be indices to the circular buffer
00134             equal_indices = start==stop
00135             start = self.convert_index(start)
00136             stop = self.convert_index(stop - 1) + 1
00137             
00138             # return the requested range
00139             if equal_indices or (step >= self.n_vals):
00140                 #print "self.buf[start:start] = self.buf[%d:%d]" % (start,start)
00141                 return self.buf[start:start]
00142              
00143             if start < stop:
00144                 #print "self.buf[start:stop:step] = self.buf[%d:%d:%d]" % (start,stop,step)
00145                 return self.buf[start:stop:step]
00146             else:
00147                 wrap_start = ((self.n_vals - start) + step) % step
00148                 #print "np.concatenate([self.buf[start::step], self.buf[wrap_start:stop:step]]) ="
00149                 #print "np.concatenate([self.buf[%d::%d], self.buf[%d:%d:%d]])" % (start,step,wrap_start,stop,step)
00150                 return np.concatenate([self.buf[start::step], self.buf[wrap_start:stop:step]])
00151         else:
00152             self.check_index(i)
00153             i = self.convert_index(i)
00154             return self.buf[i]
00155 
00156     def __repr__(self):
00157         return str(self.to_list())
00158 
00159     def __len__(self):
00160         return self.n_vals
00161 
00162 
00163 
00164 if __name__ == '__main__':
00165     cb1 = CircularBuffer(5, ())
00166     cb1.append(1)
00167     cb1.append(2)
00168     cb1.append(3)
00169     cb1.append(4)
00170     cb1.append(5)
00171     cb1.append(6)
00172     cb1.append(7)
00173     cb1.append(8)
00174 
00175     print 'cb1:', cb1
00176     print 'cb1.buf:', cb1.buf
00177     print 'cb1[0::3]', cb1[0::3]
00178     print 'cb1.get_array()[0::3]', cb1.get_array()[0::3]
00179 
00180 
00181 


hrl_lib
Author(s): Cressel Anderson, Travis Deyle, Advait Jain, Hai Nguyen, Advisor: Prof. Charlie Kemp, Lab: Healthcare Robotics Lab at Georgia Tech
autogenerated on Wed Nov 27 2013 11:34:06