Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007 import random as rd
00008 import numpy as np, math
00009
00010 class PFilter:
00011 def __init__(self, motion_model, measurement_model, particles):
00012 self.motion_model = motion_model
00013 self.measurement_model = measurement_model
00014
00015
00016 self.p = particles
00017
00018 def motion( self, control_input ):
00019 print 'Motioning'
00020 new_xy = np.row_stack([ self.motion_model.predict( i[0:2] )
00021 for i in self.p ])
00022 new_p = np.column_stack([ new_xy, self.p[:,2] ])
00023 self.p = np.copy( new_p )
00024 return np.copy( new_p )
00025
00026 def measurement( self, measurement ):
00027
00028
00029 print 'Measuring.'
00030 w = np.array([ self.measurement_model.weight( measurement, i[0:2] ) for i in self.p ])
00031 new_w = self.p[:,2] * w
00032
00033 new_p = np.column_stack([ self.p[:,0:2], new_w ])
00034 self.p = np.copy( new_p )
00035 return np.copy( new_p )
00036
00037 def resample( self ):
00038 print 'Resampling'
00039 weighted_set = [ ( i[0:2], i[2] ) for i in self.p ]
00040 normalized_set = normalize_likelihood(weighted_set)
00041
00042 new_xy = np.row_stack([ i for i in resample_uss( len(self.p), normalized_set )])
00043 new_p = np.column_stack([ new_xy, np.ones( new_xy.shape[0] ) ])
00044 self.p = np.copy( new_p )
00045 return np.copy( new_p )
00046
00047 def step( self, control_input, measurement ):
00048 self.motion( control_input )
00049 self.measurement( measurement )
00050 self.resample()
00051
00052
00053 def resample_uss(num_samples, particles):
00054 """
00055 Universal stochastic sampler (low variance resampling)
00056 num_samples - number of samples desired
00057 particles - pairs of (state, weight) tuples
00058 """
00059 samples = []
00060 r = rd.random() * (1.0 / float(num_samples))
00061 c = (particles[0])[1]
00062 i = 0
00063
00064 for m in xrange(num_samples):
00065 U = r + m * (1.0 / float(num_samples))
00066
00067 while U > c:
00068 i = i + 1
00069 if i >= len(particles):
00070 i = 0
00071 c = c + (particles[i])[1]
00072 samples.append((particles[i])[0])
00073 return samples
00074
00075 def normalize_likelihood(weighted_particles):
00076 """ Make all the particle weights sum up to 1 """
00077 def add(a,b):
00078 apart, aw = a
00079 bpart, bw = b
00080 return ('', aw+bw)
00081 total_weight = (reduce(add, weighted_particles, ('',0.0)))[1]
00082 def normalize(a):
00083 part, weight = a
00084 return (part, weight/total_weight)
00085 return map(normalize, weighted_particles)
00086
00087 class NoMotion:
00088 def __init__( self ):
00089 print 'Starting NoMotion.'
00090
00091 def predict( self, p ):
00092 return p
00093
00094 class NoMeasure:
00095 def __init__( self ):
00096 print 'Starting NoMeasure.'
00097
00098 def weight( self, p, m ):
00099 return 0.5
00100
00101
00102
00103 if __name__ == '__main__':
00104
00105 X,Y = np.meshgrid( np.linspace(0,3,4),
00106 np.linspace(0,3,4) )
00107 xyw = np.row_stack([ X.flatten(),
00108 Y.flatten(),
00109 np.ones( X.shape ).flatten() ]).T
00110
00111 pf = PFilter( NoMotion(), NoMeasure(), xyw )
00112 pf.motion( 0 )
00113 pf.measurement( 0 )
00114 pf.resample()
00115
00116 print pf.p
00117
00118 pf.measurement( 0 )
00119 pf.p[0,2] = 5
00120 pf.resample()
00121
00122 print pf.p