pfilter.py
Go to the documentation of this file.
00001 #!/usr/bin/python
00002 import numpy as np
00003 import random as rd
00004 
00005 ############################################################################
00006 ###               Functions implementing a particle filter
00007 ############################################################################
00008 # Note:
00009 #    To instantiate a particle filter you will need a motion and appearance
00010 #    model.  Below are signatures and description of the motion and 
00011 #    appearance models:
00012 #        (optional) motion.make_set: (int)                -> list state
00013 #        motion.predict:             (control, state)     -> state
00014 #        appearance.weight:          (measurement, state) -> double
00015 #
00016 #    Where what is considered a 'state' must agree between the motion and 
00017 #    appearance classes.
00018 #
00019 #    Optional:
00020 #       The particle filter can be sped up by defining additional functions:
00021 #          * weight_partial  - partial application
00022 #          * weight_set      - any other optimizations
00023 #
00024 #          * predict_partial - partial application
00025 
00026 def retTrue( *args ):
00027     return True
00028 
00029 def retFalse( *args ):
00030     return False
00031 
00032 class PFilter:
00033     def __init__(self, motion, appearance):
00034         """ Makes a particle filter """
00035         self.motion = motion
00036         self.appearance = appearance
00037 
00038     def step(self, control_input, measurement, particle_set, draw_func=None, set_op=False, should_resample_func=retTrue):
00039         """ Go through one cycle of particle filtering """
00040         #print particle_set
00041         predictions    = predict(self.motion, control_input, particle_set)
00042         #print predictions
00043         weighted_set   = likelihood(self.appearance, measurement, predictions)
00044         #print weighted_set
00045         if draw_func is not None:
00046             draw_func(weighted_set)
00047 
00048         if should_resample_func():
00049             if set_op:
00050                 normalized_set = set_norm_likelihood(weighted_set)
00051                 retVal = set_resample_uss(particle_set.shape[1], normalized_set)
00052             else:
00053                 normalized_set = normalize_likelihood(weighted_set)
00054                 retVal = resample_uss(len(particle_set), normalized_set)
00055         else:
00056             retVal = weighted_set # Change by travis to avoid resampling, but return weights as part of particle set
00057             
00058         return retVal
00059 
00060 ############################################################################
00061 ###                                Helpers
00062 ############################################################################
00063 def predict(motion_model, control_input, particle_set):
00064     """ Predict using motion model """
00065     if hasattr(motion_model, "predict_partial"):
00066         f = motion_model.predict_partial(control_input)
00067         predictions = [f(particle) for particle in particle_set]
00068     elif hasattr(motion_model, "predict_set"):
00069         predictions = motion_model.predict_set(control_input, particle_set)    
00070     else:
00071         predictions = [motion_model.predict(control_input, particle) for particle in particle_set]
00072     return predictions
00073 
00074 
00075 def likelihood(appearance_model, measurement, particle_set):
00076     """ Evaluate using appearance model """
00077     if hasattr(appearance_model, "weight_set"):
00078         weighted = appearance_model.weight_set(measurement, particle_set)
00079     elif hasattr(appearance_model, "weight_partial"):
00080         f = appearance_model.weight_partial(measurement)
00081         weighted = [(particle, f(particle)) for particle in particle_set]
00082     else:
00083         weighted = [(particle, appearance_model.weight(measurement, particle)) for particle in particle_set]
00084     return weighted
00085 
00086 def resample_uss(num_samples, particles):
00087     """ 
00088         Universal stochastic sampler (low variance resampling)
00089         num_samples - number of samples desired
00090         particles   - pairs of (state, weight) tuples
00091     """
00092     samples = []
00093     r        = rd.random() * (1.0 / float(num_samples))
00094     c        = (particles[0])[1]
00095     i        = 0
00096 
00097     for m in xrange(num_samples):
00098         U = r + m * (1.0 / float(num_samples))
00099         #print "U", U
00100         while U > c:
00101             i = i + 1
00102             if i >= len(particles):
00103                 i = 0
00104             c = c + (particles[i])[1]
00105         samples.append((particles[i])[0])
00106     return samples
00107 
00108 def set_resample_uss(num_samples, particles):
00109     """ 
00110         Universal stochastic sampler (low variance resampling)
00111         num_samples - number of samples desired
00112         particles   - pairs of (state, weight) tuples
00113     """
00114     ##[[ x1     x2     ...  ]
00115     ## [ y1     y2     ...  ]
00116     ## [ 0.     0.     ...  ]
00117     ## [ 1.     1.     ...  ]
00118     ## [ w1     w2     ... ]]
00119 
00120     samples = np.matrix( np.zeros( (4, num_samples) ))
00121     
00122     r        = rd.random() * (1.0 / float(num_samples))
00123     c        = particles[4,0]
00124     i        = 0
00125 
00126     for m in xrange(num_samples):
00127         U = r + m * (1.0 / float(num_samples))
00128         #print "U", U
00129         while U > c:
00130             i = i + 1
00131             if i >= particles.shape[1]:
00132                 i = 0
00133             c = c + particles[4,i]
00134         samples[:,m] = particles[0:4,i]
00135     return samples
00136 
00137 
00138 def normalize_likelihood(weighted_particles):
00139     """ Make all the particle weights sum up to 1 """
00140     def add(a,b):
00141         apart, aw = a
00142         bpart, bw = b
00143         return ('', aw+bw)
00144     total_weight = (reduce(add, weighted_particles, ('',0.0)))[1]
00145     def normalize(a):
00146         part, weight = a
00147         return (part, weight/total_weight)
00148     return map(normalize, weighted_particles)
00149 
00150 def set_norm_likelihood(weighted_particles):
00151     wp = weighted_particles
00152     
00153     wSum = np.sum( wp[4,:] )
00154     wp[4,:] = wp[4,:] / wSum
00155     return wp
00156     
00157 
00158 
00159 if __name__ == "__main__":
00160     particles     = [("4", 4), ("1",1), ("2",2), ("3", 3)]
00161     normalized    = normalize_likelihood(particles)
00162     #print normalized
00163 
00164     num_particles = 1000
00165     new_particles = resample_uss(num_particles, normalized)
00166     #print new_particles
00167 
00168     counts = {}
00169     for pair in particles:
00170         name, numb = pair
00171         counts[name] = 0
00172 
00173     for p in new_particles:
00174         counts[p] = 1 + counts[p]
00175 
00176     for k in counts.keys():
00177         print k, " :", (counts[k] / float(num_particles))
00178 
00179 


pfilter
Author(s): Travis Deyle, Hai Nguyen, Advisor: Prof. Charlie Kemp, Lab: Healthcare Robotics Lab at Georgia Tech
autogenerated on Wed Nov 27 2013 11:42:09