00001
00002 import numpy as np
00003 import random as rd
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 def retTrue( *args ):
00027 return True
00028
00029 def retFalse( *args ):
00030 return False
00031
00032 class PFilter:
00033 def __init__(self, motion, appearance):
00034 """ Makes a particle filter """
00035 self.motion = motion
00036 self.appearance = appearance
00037
00038 def step(self, control_input, measurement, particle_set, draw_func=None, set_op=False, should_resample_func=retTrue):
00039 """ Go through one cycle of particle filtering """
00040
00041 predictions = predict(self.motion, control_input, particle_set)
00042
00043 weighted_set = likelihood(self.appearance, measurement, predictions)
00044
00045 if draw_func is not None:
00046 draw_func(weighted_set)
00047
00048 if should_resample_func():
00049 if set_op:
00050 normalized_set = set_norm_likelihood(weighted_set)
00051 retVal = set_resample_uss(particle_set.shape[1], normalized_set)
00052 else:
00053 normalized_set = normalize_likelihood(weighted_set)
00054 retVal = resample_uss(len(particle_set), normalized_set)
00055 else:
00056 retVal = weighted_set
00057
00058 return retVal
00059
00060
00061
00062
00063 def predict(motion_model, control_input, particle_set):
00064 """ Predict using motion model """
00065 if hasattr(motion_model, "predict_partial"):
00066 f = motion_model.predict_partial(control_input)
00067 predictions = [f(particle) for particle in particle_set]
00068 elif hasattr(motion_model, "predict_set"):
00069 predictions = motion_model.predict_set(control_input, particle_set)
00070 else:
00071 predictions = [motion_model.predict(control_input, particle) for particle in particle_set]
00072 return predictions
00073
00074
00075 def likelihood(appearance_model, measurement, particle_set):
00076 """ Evaluate using appearance model """
00077 if hasattr(appearance_model, "weight_set"):
00078 weighted = appearance_model.weight_set(measurement, particle_set)
00079 elif hasattr(appearance_model, "weight_partial"):
00080 f = appearance_model.weight_partial(measurement)
00081 weighted = [(particle, f(particle)) for particle in particle_set]
00082 else:
00083 weighted = [(particle, appearance_model.weight(measurement, particle)) for particle in particle_set]
00084 return weighted
00085
00086 def resample_uss(num_samples, particles):
00087 """
00088 Universal stochastic sampler (low variance resampling)
00089 num_samples - number of samples desired
00090 particles - pairs of (state, weight) tuples
00091 """
00092 samples = []
00093 r = rd.random() * (1.0 / float(num_samples))
00094 c = (particles[0])[1]
00095 i = 0
00096
00097 for m in xrange(num_samples):
00098 U = r + m * (1.0 / float(num_samples))
00099
00100 while U > c:
00101 i = i + 1
00102 if i >= len(particles):
00103 i = 0
00104 c = c + (particles[i])[1]
00105 samples.append((particles[i])[0])
00106 return samples
00107
00108 def set_resample_uss(num_samples, particles):
00109 """
00110 Universal stochastic sampler (low variance resampling)
00111 num_samples - number of samples desired
00112 particles - pairs of (state, weight) tuples
00113 """
00114
00115
00116
00117
00118
00119
00120 samples = np.matrix( np.zeros( (4, num_samples) ))
00121
00122 r = rd.random() * (1.0 / float(num_samples))
00123 c = particles[4,0]
00124 i = 0
00125
00126 for m in xrange(num_samples):
00127 U = r + m * (1.0 / float(num_samples))
00128
00129 while U > c:
00130 i = i + 1
00131 if i >= particles.shape[1]:
00132 i = 0
00133 c = c + particles[4,i]
00134 samples[:,m] = particles[0:4,i]
00135 return samples
00136
00137
00138 def normalize_likelihood(weighted_particles):
00139 """ Make all the particle weights sum up to 1 """
00140 def add(a,b):
00141 apart, aw = a
00142 bpart, bw = b
00143 return ('', aw+bw)
00144 total_weight = (reduce(add, weighted_particles, ('',0.0)))[1]
00145 def normalize(a):
00146 part, weight = a
00147 return (part, weight/total_weight)
00148 return map(normalize, weighted_particles)
00149
00150 def set_norm_likelihood(weighted_particles):
00151 wp = weighted_particles
00152
00153 wSum = np.sum( wp[4,:] )
00154 wp[4,:] = wp[4,:] / wSum
00155 return wp
00156
00157
00158
00159 if __name__ == "__main__":
00160 particles = [("4", 4), ("1",1), ("2",2), ("3", 3)]
00161 normalized = normalize_likelihood(particles)
00162
00163
00164 num_particles = 1000
00165 new_particles = resample_uss(num_particles, normalized)
00166
00167
00168 counts = {}
00169 for pair in particles:
00170 name, numb = pair
00171 counts[name] = 0
00172
00173 for p in new_particles:
00174 counts[p] = 1 + counts[p]
00175
00176 for k in counts.keys():
00177 print k, " :", (counts[k] / float(num_particles))
00178
00179