00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 from collections import deque
00030 import numpy as np
00031 import cv
00032
00033 class MeanFilter(object):
00034 def __init__(self, window_size=3):
00035 self.window_size = window_size
00036 self.observations = deque([], window_size)
00037 def observation(self, obs):
00038 self.observations.append(obs)
00039 weights = range(len(self.observations), 0, -1)
00040 return np.int32(np.average(self.observations, axis=0, weights=weights)).tolist()
00041
00042 class KalmanFilter(object):
00043 initialized = False
00044 def __init__(self, cov, dynam_params, measure_params, control_params=0):
00045 self.kf = cv.CreateKalman(dynam_params, measure_params, control_params)
00046 cv.SetIdentity(self.kf.measurement_matrix, cv.RealScalar(1))
00047 self.set_cov(cv.fromarray(cov))
00048
00049 def set_cov(self, cov):
00050 cov = np.float32(cov)
00051 cv.Copy(cv.fromarray(cov), self.kf.measurement_noise_cov)
00052 assert np.all(cov == np.asarray(self.kf.measurement_noise_cov)), "Covariance matrix didn't get set"
00053
00054 def set_control(self, cont):
00055 cont = np.float32(cont)
00056 cv.Copy(cv.fromarray(cont), self.kf.control_matrix)
00057 assert np.all(cont == np.asarray(self.kf.control_matrix)), "Control matrix didn't get set"
00058
00059 def observation(self, meas):
00060 meas = np.float32([meas])
00061 if not self.initialized:
00062 cv.Copy(cv.fromarray(meas.T.copy()), self.kf.state_post)
00063 cv.Copy(cv.fromarray(meas.T.copy()), self.kf.state_pre)
00064 self.initialized = True
00065 if self.kf.CP == 0:
00066 cv.KalmanPredict(self.kf)
00067 corrected = np.asarray(cv.KalmanCorrect(self.kf, cv.fromarray(meas.T.copy())))
00068 return corrected.T[0].copy()
00069
00070 def control(self, signal):
00071 signal = np.float32([signal])
00072 if not self.initialized:
00073 return np.zeros(1,self.kf.DP)
00074 predicted = np.asarray(cv.KalmanPredict(self.kf, cv.fromarray(signal.T.copy())))
00075 return predicted.T[0].copy()
00076
00077
00078 if __name__ == '__main__':
00079 def make_obs(x):
00080
00081 gt = [x,x]
00082
00083
00084
00085 return gt, np.random.multivariate_normal(gt, cov)
00086
00087 import matplotlib.pyplot as plt
00088
00089 cov = np.ones((2,2))*100
00090 f = KalmanFilter(cov, 2, 2, 2)
00091 f.set_control(cv.fromarray(np.matrix([[1.0,0.0],[0.0,1.0]])))
00092 print np.asarray(f.kf.control_matrix)
00093
00094 observations = []
00095 filtered = []
00096 gts = []
00097
00098 for gt, obs in (make_obs(x) for x in xrange(0,100,1)):
00099 gts.append(gt)
00100
00101 observations.append(obs)
00102
00103 corrected = f.observation(obs)
00104
00105 predicted = f.control([1,0])
00106 filtered.append(corrected)
00107
00108 observations = np.array(observations)
00109 filtered = np.array(filtered)
00110 gts = np.array(gts)
00111
00112 plt.clf()
00113 plt.plot(observations[:,0])
00114 plt.hold(True)
00115 plt.plot(filtered[:,0])
00116 plt.legend(['obs', 'filtered'])
00117 plt.show()