[1]:
%pylab inline
from inspect import getsource
import scipy.signal as signal
from IPython.display import Audio, display
from IPython.display import Image, SVG
Populating the interactive namespace from numpy and matplotlib
[2]:
from gps_helper.smoothing import RecursiveAverage, MovingAverageFilter, LowPassFilter
[3]:
pylab.rcParams['savefig.dpi'] = 100 # default 72
%config InlineBackend.figure_formats=['svg'] # SVG inline viewing

Signal Filtering

[4]:
figure(figsize=(6,5))
f0 =0.1
fs = 100
t = arange(0,20,1/fs)
s = sign(cos(2*pi*.1*t))
subplot(311)
title(r'0.1 Hz Squarewave with RC Lowpass')
ylabel(r'Signal - s')
plot(t,s)
grid()
subplot(312)
std_n = 0.4
r = s + std_n*randn(len(s))
plot(t,r)
ylabel(r's + n')
grid()
subplot(313)
f3 = 3*f0
a = exp(-2*pi*f3/fs)
rf = signal.lfilter([1-a],[1,-a],r)
plot(t,rf)
xlabel(r'Time (s)')
ylabel(r'LPF(s + n)')
grid()
tight_layout()
../_images/nb_examples_Kalman_Basics_practice_4_0.svg

Measurement Filtering

[5]:
print(getsource(RecursiveAverage))
class RecursiveAverage:
    """
    Recursive average filter implementation.
    """

    def __init__(self):
        self.k = 0.0
        self.x_k = 0

    def next_sample(self, x):
        """
        Process the next data sample.

        :param x: Next data sample to compute average
        :return: float
        """
        self.k += 1.0
        self.x_k = ((self.k - 1.) / self.k) * self.x_k + x / self.k
        return self.x_k

[6]:
print(getsource(MovingAverageFilter))
class MovingAverageFilter:
    """
    Moving average filter implementation.
    """

    def __init__(self, n):
        """
        :param n: Number of samples to move the average over
        """
        self.n = float(n)
        self.x_k = [0.0 for ii in range(0, int(self.n))]

    def next_sample(self, x):
        """
        Process the next sample.

        :param x: Next data sample to compute average
        :return: float
        """
        self.x_k[1:] = self.x_k[:-1]
        self.x_k[0] = x
        return sum(self.x_k) / self.n

[7]:
print(getsource(LowPassFilter))
class LowPassFilter:
    """
    Low pass filter implementation.
    """

    def __init__(self, alpha):
        """
        Provide the alpha weighting value to initialize.
        :param alpha: Weighting value
        """
        self.alpha = alpha
        self.prev = 0

    def next_sample(self, x):
        """
        Process the next sample.

        :param x: Next sample value.
        :return: float
        """
        xlpf = self.alpha * self.prev + (1 - self.alpha) * x
        self.prev = xlpf
        return xlpf

[8]:
Npts = 100
x = 5
v = 0.3*randn(Npts)
z = x + v
yravg = zeros_like(z)
ravg = RecursiveAverage()
y10 = zeros_like(z)
movavg10 = MovingAverageFilter(10)
y20 = zeros_like(z)
movavg20 = MovingAverageFilter(20)
lpf09 = LowPassFilter(0.9)
yfof09 = zeros_like(z)
for k, z_k in enumerate(z):
    yravg[k] = ravg.next_sample(z_k)
    y10[k] = movavg10.next_sample(z_k)
    y20[k] = movavg20.next_sample(z_k)
    yfof09[k] = lpf09.next_sample(z_k)
plot(arange(Npts),z,'r.',markersize=4)
plot(arange(Npts),yravg)
plot(arange(Npts),y10)
plot(arange(Npts),y20)
plot(arange(Npts),yfof09)
legend((r'Measurements',r'RecurAvg',r'MovAvg10',
        r'MovAvg20',r'First-Order $\alpha=0.9$'))
ylabel(r'Filter Output')
xlabel(r'Index $n$')
title(r'Measurement Filtering a Noisy Constant')
grid();
../_images/nb_examples_Kalman_Basics_practice_9_0.svg
[9]:
Npts = 100
n = arange(0,100)
x = 5*cos(2*pi*n/200)
v = 0.3*randn(Npts)
z = x + v
yravg = zeros_like(z)
ravg = RecursiveAverage()
y10 = zeros_like(z)
movavg10 = MovingAverageFilter(10)
y20 = zeros_like(z)
movavg20 = MovingAverageFilter(20)
lpf09 = LowPassFilter(0.9)
yfof09 = zeros_like(z)
for k, z_k in enumerate(z):
    yravg[k] = ravg.next_sample(z_k)
    y10[k] = movavg10.next_sample(z_k)
    y20[k] = movavg20.next_sample(z_k)
    yfof09[k] = lpf09.next_sample(z_k)
plot(arange(Npts),z,'r.',markersize=4)
plot(arange(Npts),yravg)
plot(arange(Npts),y10)
plot(arange(Npts),y20)
plot(arange(Npts),yfof09)
legend((r'Measurements',r'RecurAvg',r'MovAvg10',
        r'MovAvg20',r'First-Order $\alpha=0.9$'))
ylabel(r'Filter Output')
xlabel(r'Index $n$')
title(r'Measurement Filtering Half Cycle Cosine')
grid();
../_images/nb_examples_Kalman_Basics_practice_10_0.svg

Kalman Filtering Background

It was in 1960 that the Kalman filter was born. Today we are many many innovations beyond its humble beginnings.

05e4bb1c1eab405c8f47b61ac1028dd5

[10]:
Image('figs/Kalman_variables.png',width='60%')
[10]:
../_images/nb_examples_Kalman_Basics_practice_13_0.png

677b9b9997344f218ffa6ddbeaf40b0c

[11]:
Image('figs/Kalman_Filter.png',width='70%')
[11]:
../_images/nb_examples_Kalman_Basics_practice_15_0.png

Basic Examples from Kim

Chapter 10.2: Estimating a Constant Voltage

[12]:
class GetVoltage(object):
    """
    A class for generating the battery voltage measurements

    Mark Wickert February 2018
    """

    def __init__ (self, batt_voltage = 14.4, dt = 0.2, sigma_w = 2):
        """
        Initialize the object
        """
        self.sigma_w = sigma_w
        self.Voltage_set = batt_voltage

        self.dt = dt


    def measurement(self):
        """
        Take a measurement
        """
        w = 0 + self.sigma_w*random.randn(1)[0]
        z = self.Voltage_set + w
        return z
[13]:
class SimpleKalman(object):
    """
    Kim Chapter 10.2 Battery voltage estimation with measurement noise

    Python 3.x is assumed so the operator @ can be used for matrix multiply

    Mark Wickert February 2018
    """


    def __init__ (self, dt = 0.2, initial_state = 14, P = 6):
        """
        Initialize the object
        """
        self.dt = dt
        self.A = array([[1]])
        self.H = array([[1]])
        # Process model covariance
        self.Q = array([[0]])
        # Measurement model covariance
        self.R = 4
        self.x = array([[initial_state]])
        # Error covariance initialize
        self.P = P*eye(1)


    def next_sample(self,z):
        """
        Update the Kalman filter state by inputting a new
        scalar measurement. Return the state array as a tuple
        Update all other Kalman filter quantities
        """
        xp = self.A @ self.x
        Pp = self.A @ self.P @ self.A.T + self.Q

        self.K = Pp @ self.H.T * inv(self.H @ Pp @ self.H.T + self.R)

        self.x = xp + self.K @ (array([[z]] - self.H @ xp))
        self.P = Pp - self.K @ self.H @ Pp

        self.volt = self.x[0]
        return self.volt
[14]:
dt = 0.1
t = arange(0,10+dt,dt)
Xsaved = zeros((len(t),2))
Zsaved = zeros(len(t))
Ksaved = zeros(len(t))
Psaved = zeros(len(t))

# Create objects for the simulation
GetVoltage1 = GetVoltage(14.0,dt,sigma_w = 2)
SimpleKalman1 = SimpleKalman(initial_state=14)

for k in range(len(t)):
    z = GetVoltage1.measurement()
    Xsaved[k,:] = SimpleKalman1.next_sample(z)
    Zsaved[k] = z
    Ksaved[k] = SimpleKalman1.K
    Psaved[k] = SimpleKalman1.P
[15]:
plot(t,Zsaved,'r.')
plot(t,Xsaved[:,0])
ylabel(r'Voltage (V)')
xlabel(r'Time (s)')
legend((r'Measured',r'Kalman Filter'),loc='best')
title(r'Voltage Estimation')
grid();
../_images/nb_examples_Kalman_Basics_practice_21_0.svg
[16]:
subplot(211)
plot(t,Psaved)
ylabel(r'P')
xlabel(r'Time (s)')
title(r'Covariance ($\sigma_x^2$) as $P_k$')
grid();
subplot(212)
plot(t,Ksaved)
ylabel(r'K')
xlabel(r'Time (s)')
title(r'Kalman Gain $K_k$')
grid();
tight_layout()
../_images/nb_examples_Kalman_Basics_practice_22_0.svg
[17]:
plot(t,Ksaved)
ylabel(r'K')
xlabel(r'Time (s)')
grid();
../_images/nb_examples_Kalman_Basics_practice_23_0.svg

Notes Radial Position Example

[18]:
class GetPosVel(object):
    """
    A class for generating position and velocity
    measurements and truth values
    of the state vector.

    Mark Wickert May 2018
    """


    def __init__ (self,pos_set = 0, vel_set = 80.0, dt = 0.1,
                  Q = [[1,0],[0,3]], R = [[10,0],[0,2]]):
        """
        Initialize the object
        """
        self.actual_pos = pos_set
        self.actual_vel = vel_set

        self.Q = array(Q)
        self.R = array(R)
        self.dt = dt


    def measurement(self):
        """
        Take a measurement
        """
        # Truth position and velocity
        self.actual_vel = self.actual_vel
        self.actual_pos = self.actual_pos \
        + self.actual_vel*self.dt

        # Measured value is truth plus measurement error
        z1 = self.actual_pos + sqrt(self.R[0,0])*random.randn()
        z2 = self.actual_vel + sqrt(self.R[1,1])*random.randn()
        return array([[z1],[z2]])
[19]:
class PosKalman(object):
    """
    Position Estimation from Position and Velocity Measurements

    Python 3.x is assumed so the operator @ can be used for matrix multiply

    Mark Wickert May 2018
    """


    def __init__ (self, Q, R, initial_state = [0, 20], dt = 0.1):
        """
        Initialize the object
        """
        self.dt = dt
        self.A = array([[1, dt],[0,1]])
        self.H = array([[1,0],[0,1]])
        # Process model covariance
        self.Q = Q
        # Measurement model covariance
        self.R = R
        self.x = array([[initial_state[0]],[initial_state[1]]])
        # Error covariance initialize
        self.P = 5*eye(2)
        # Initialize state
        self.x = array([[0.0],[0.0]])


    def next_sample(self,z):
        """
        Update the Kalman filter state by inputting a new
        scalar measurement. Return the state array as a tuple
        Update all other Kalman filter quantities
        """
        xp = self.A @ self.x
        Pp = self.A @ self.P @ self.A.T + self.Q

        self.K = Pp @ self.H.T * inv(self.H @ Pp @ self.H.T + self.R)

        self.x = xp + self.K @ (z - self.H @ xp)
        self.P = Pp - self.K @ self.H @ Pp
        return self.x

Run a Simulation

[20]:
dt = 0.1
t = arange(0,10+dt,dt)
Xsaved = zeros((2,len(t)))
Zsaved = zeros((2,len(t)))
Psaved = zeros(len(t))
Vsaved = zeros(len(t))
# Save history of error covariance matrix diagonal
P_diag = zeros((len(t),2))

# Create objects for the simulation
Q = array([[1,0],[0,3]])
R = array([[10,0],[0,2]])
GetPos1 = GetPosVel(Q=Q,R=R,dt=dt)
PosKalman1 = PosKalman(Q,R, initial_state=[0,80])

for k in range(len(t)):
    # take a measurement
    z = GetPos1.measurement()
    # Update the Kalman filter
    Xsaved[:,k,None] = PosKalman1.next_sample(z)
    Zsaved[:,k,None] = z
    Psaved[k] = GetPos1.actual_pos
    Vsaved[k] = GetPos1.actual_vel
    P_diag[k,:] = PosKalman1.P.diagonal()
[21]:
figure(figsize=(6,3.5))
plot(t,Zsaved[0,:]-Psaved,'r.')
plot(t,Xsaved[0,:]-Psaved)
xlabel(r'Time (s)')
ylabel(r'Position Error (m)')
title(r'Position Error: $x_1 - p_{true}$')
legend((r'Measurement Error',r'Estimation Error'))
grid();
../_images/nb_examples_Kalman_Basics_practice_29_0.svg
[22]:
figure(figsize=(6,3.5))
plot(t,Zsaved[1,:]-Vsaved,'r.')
plot(t,Xsaved[1,:]-Vsaved)
xlabel(r'Time (s)')
ylabel(r'Velocity Error (m)')
title(r'Velocity Error: $x_2 - v_{true}$')
legend((r'Measurement Error',r'Estimation Error'))
grid();
../_images/nb_examples_Kalman_Basics_practice_30_0.svg
[23]:
figure(figsize=(6,3.5))
plot(t,P_diag[:,0])
plot(t,P_diag[:,1])
title(r'Covariance Matrix $\mathbf{P}$ Diagonal Entries')
ylabel(r'Variance')
xlabel(r'Time (s) (given $T_s = 0.1$s)')
legend((r'$\sigma_p^2$ (m$^2$)',r'$\sigma_v^2$ (m$^4$)'))
grid();
../_images/nb_examples_Kalman_Basics_practice_31_0.svg

Chapter 11.2 & 11.3: Estimating Velocity from Position

[24]:
class GetPos(object):
    """
    A class for generating position measurements as found in Kim

    Mark Wickert December 2017
    """


    def __init__ (self,Posp = 0, Vel_set = 80.0, dt = 0.1,
                  var_w = 10.0, var_v = 10.0):
        """
        Initialize the object
        """
        self.Posp = Posp
        self.Vel_set = Vel_set
        self.Velp = Vel_set

        self.dt = dt

        self.var_w = var_w
        self.var_v = var_v


    def measurement(self):
        """
        Take a measurement
        """
        # The velocity process noise
        w = 0 + self.var_w*random.randn(1)[0]
        # The position measurement noise
        v = 0 + self.var_v*random.randn(1)[0]

        # Update the position measurement
        z = self.Posp + self.Velp*self.dt + v
        # Also update the truth values of position and velocity
        self.Posp = z - v
        self.Velp = self.Vel_set + w
        return z
[25]:
class DvKalman(object):
    """
    Kim Chapter 11.2 Velocity from Position Estimation

    Python 3.x is assumed so the operator @ can be used for matrix multiply

    Mark Wickert December 2017
    """


    def __init__ (self,initial_state = [0, 20]):
        """
        Initialize the object
        """
        self.dt = 0.1
        self.A = array([[1, self.dt],[0,1]])
        self.H = array([[1,0]])
        # Process model covariance
        self.Q = array([[1,0],[0,3]])
        # Measurement model covariance
        self.R = 10
        self.x = array([[initial_state[0]],[initial_state[1]]])
        # Error covariance initialize
        self.P = 5*eye(2)
        # Initialize pos and vel
        self.pos = 0.0
        self.vel = 0.0


    def next_sample(self,z):
        """
        Update the Kalman filter state by inputting a new
        scalar measurement. Return the state array as a tuple
        Update all other Kalman filter quantities
        """
        xp = self.A @ self.x
        Pp = self.A @ self.P @ self.A.T + self.Q

        self.K = Pp @ self.H.T * inv(self.H @ Pp @ self.H.T + self.R)

        self.x = xp + self.K @ (array([[z]] - self.H @ xp))
        self.P = Pp - self.K @ self.H @ Pp

        self.pos = self.x[0]
        self.vel = self.x[1]
        return self.pos, self.vel

Run a Simulation

[26]:
dt = 0.1
t = arange(0,10+dt,dt)
Xsaved = zeros((len(t),2))
Zsaved = zeros(len(t))
Vsaved = zeros(len(t))

# Create objects for the simulation
GetPos1 = GetPos()
DvKalman1 = DvKalman()

for k in range(len(t)):
    z = GetPos1.measurement()
    # pos, vel = DvKalman1.update(z)

    Xsaved[k,:] = DvKalman1.next_sample(z)
    Zsaved[k] = z
    Vsaved[k] = GetPos1.Velp
[27]:
plot(t,Zsaved,'r.')
plot(t,Xsaved[:,0])
ylabel(r'Position (m)')
xlabel(r'Time (s)')
legend((r'Measured',r'Kalman Filter'),loc='best')
grid();
../_images/nb_examples_Kalman_Basics_practice_37_0.svg
[28]:
plot(t,Vsaved,'r.')
plot(t,Xsaved[:,1])
ylabel(r'Velocity (m/s)')
xlabel(r'Time (s)')
legend((r'True Speed',r'Kalman Filter'),loc='best')
grid();
../_images/nb_examples_Kalman_Basics_practice_38_0.svg

Chapter 11.4: Estimating Position from Velocity

[29]:
class GetVel(object):
    """
    A class for generating velocity measurements as found in Kim 11.4

    Mark Wickert December 2017
    """


    def __init__ (self,Pos_set = 0, Vel_set = 80.0, dt = 0.1, var_v = 10.0):
        """
        Initialize the object
        """
        self.Posp = Pos_set
        self.Vel_set = Vel_set
        self.Velp = Vel_set

        self.dt = dt

        self.var_v = var_v


    def measurement(self):
        """
        Take a measurement
        """
        # The velocity measurement noise
        v = 0 + self.var_v*random.randn(1)[0]

        # Also update the truth values of position and velocity
        self.Posp += self.Velp*self.dt
        self.Velp = self.Vel_set + v
        z = self.Velp
        return z
[30]:
class IntKalman(object):
    """
    Kim Chapter 11.4 Position from Velocity Estimation

    Python 3.x is assumed so the operator @ can be used for matrix multiply

    Mark Wickert December 2017
    """


    def __init__ (self,initial_state = [0, 20]):
        """
        Initialize the object
        """
        self.dt = 0.1
        self.A = array([[1, dt],[0,1]])
        self.H = array([[0,1]])
        # Process model covariance
        self.Q = array([[1,0],[0,3]])
        # Measurement model covariance
        self.R = 10
        self.x = array([[initial_state[0]],[initial_state[1]]])
        # Error covariance initialize
        self.P = 5*eye(2)
        # Initialize pos and vel
        self.pos = 0.0
        self.vel = 0.0


    def next_sample(self,z):
        """
        Update the Kalman filter state by inputting a new scalar measurement.
        Return the state array as a tuple
        Update all other Kalman filter quantities
        """
        xp = self.A @ self.x
        Pp = self.A @ self.P @ self.A.T + self.Q

        self.K = Pp @ self.H.T * inv(self.H @ Pp @ self.H.T + self.R)

        self.x = xp + self.K @ (array([[z]] - self.H @ xp))
        self.P = Pp - self.K @ self.H @ Pp

        self.pos = self.x[0]
        self.vel = self.x[1]
        return self.pos, self.vel
[31]:
dt = 0.1
t = arange(0,10+dt,dt)
Xsaved = zeros((len(t),2))
Zsaved = zeros(len(t))
Psaved = zeros(len(t))

# Create objects for the simulation
GetVel1 = GetVel()
IntKalman1 = IntKalman()

for k in range(len(t)):
    z = GetVel1.measurement()

    Xsaved[k,:] = IntKalman1.next_sample(z)
    Zsaved[k] = z
    Psaved[k] = GetVel1.Posp
[32]:
plot(t,Zsaved,'r.')
plot(t,Xsaved[:,1])
ylabel(r'Velocity (m/s)')
xlabel(r'Time (s)')
legend((r'Measurements',r'Kalman Filter'),loc='best')
grid();
../_images/nb_examples_Kalman_Basics_practice_43_0.svg
[33]:
plot(t,Xsaved[:,0])
plot(t,Psaved)
ylabel(r'Position (m)')
xlabel(r'Time (s)')
legend((r'True Position' ,r'Kalman Filter',),loc='best')
grid();
../_images/nb_examples_Kalman_Basics_practice_44_0.svg

Chapter 11.5: Measuring Velocity with Sonar

[34]:
# From scipy.io
from scipy.io import loadmat
[35]:
class GetSonar(object):
    """
    A class for playing back sonar altitude measurements as found in Kim 2.4
    and later used in Kim 11.5

    Mark Wickert December 2017
    """


    def __init__ (self,):
        """
        Initialize the object
        """
        sonarD = loadmat('SonarAlt')
        self.h = sonarD['sonarAlt'].flatten()
        self.Max_pts = len(self.h)
        self.k = 0


    def measurement(self):
        """
        Take a measurement
        """

        h = self.h[self.k]
        self.k += 1
        if self.k > self.Max_pts:
            print('Recycling data by starting over')
            self.k = 0
        return h
[36]:
Nsamples = 500
t = arange(Nsamples)*.02
Xsaved = zeros(Nsamples)
Xmsaved = zeros(Nsamples)
Xksaved = zeros((Nsamples,2))

GetSonar1 = GetSonar()
MovAvgFilter1 = MovingAverageFilter(10)
DvKalman2 = DvKalman()
for k in range(Nsamples):
    xm =GetSonar1.measurement()
    Xmsaved[k] = xm
    Xsaved[k] = MovAvgFilter1.next_sample(xm)
    Xksaved[k,:] = DvKalman2.next_sample(xm)
[37]:
plot(t,Xmsaved,'r.',markersize=4)
plot(t,Xsaved)
plot(t,Xksaved[:,0])
xlim([0,10])
ylabel(r'Altitude (m)')
xlabel(r'Time (s)')
legend((r'Measurement' ,r'Moving Average - 10',r'Kalman-pos'),loc='best')
grid();
../_images/nb_examples_Kalman_Basics_practice_49_0.svg
[38]:
plot(t,Xksaved[:,1],'orange')
xlim([0,10])
ylabel(r'Velocity (m/s)')
xlabel(r'Time (s)')
legend((r'Kalman-vel',),loc='best')
grid();
../_images/nb_examples_Kalman_Basics_practice_50_0.svg

Nonlinear Kalman Filter

Chapter 14 Extended Kalman Filter (EKF)

Here we focus on:

  • Replacing \(\mathbf{Ax}_k\) with the nonlinearity \(f(\mathbf{x}_k)\) and \(\mathbf{Hx}_k\) with the nonlinearity \(\mathbf{h}(\mathbf{x}_k)\)
  • How in the end the EKF linearizes the nonlinear model at each time step by using the Jacobian matrices \(\mathbf{A} \equiv \partial \mathbf{f}/\partial \mathbf{x}\) and \(\mathbf{H} \equiv \partial \mathbf{h}/\partial \mathbf{x}\)
  • Assembling the EKF algorithm from the Kalman filter foundation
  • Finally, a Radar tracking example (Kim 14.4)

1ed0990a2c0a41a488acd10028811c07

[39]:
Image('figs/EKF_Filter.png',width='70%')
[39]:
../_images/nb_examples_Kalman_Basics_practice_53_0.png
[40]:
class GetRadar(object):
    """
    A class for generating radar slant range measurements as found in Kim 14.4

    Mark Wickert December 2017
    """


    def __init__ (self,Pos_set = 0, Vel_set = 80.0, Alt_set = 1000, dt = 0.1,
                  var_Vel = 25.0, var_Alt = 100):
        """
        Initialize the object
        """
        self.Posp = Pos_set
        self.Vel_set = Vel_set
        self.Alt_set = Alt_set

        self.dt = dt

        self.var_Vel = var_Vel
        self.var_Alt = var_Alt


    def measurement(self):
        """
        Take a measurement
        """
        # The velocity process with uncertainty
        vel = self.Vel_set + sqrt(self.var_Vel)*random.randn(1)[0]
        # The altitude process with uncertainty
        alt = self.Alt_set + sqrt(self.var_Alt)*random.randn(1)[0]

        # New position
        pos = self.Posp + vel*dt

        # Slant range measurement noise
        v = 0 + pos*0.05*random.randn(1)[0]
        # The slant range
        r = sqrt(pos**2 + alt**2) + v
        self.Posp = pos
        return r
[41]:
class RadarEKF(object):
    """
    Kim Chapter 14.4 Radar Range Tracking

    Python 3.x is assumed so the operator @ can be used for matrix multiply

    Mark Wickert December 2017
    """


    def __init__ (self, dt=0.05, initial_state = [0, 90, 1100]):
        """
        Initialize the object
        """
        self.dt = dt
        self.A = eye(3) + dt*array([[0, 1, 0], [0, 0, 0], [0, 0, 0]])
        # Process model covariance
        self.Q = array([[0, 0, 0], [0, 0.001, 0], [0, 0, 0.001]])
        # Measurement model covariance
        self.R = array([[10]])
        self.x = array(initial_state)
        # Error covariance initialize
        self.P = 10*eye(3)
        # Initialize pos and vel
        self.pos = 0.0
        self.vel = 0.0
        self.alt = 0.0


    def next_sample(self,z):
        """
        Update the Kalman filter state by inputting a new scalar measurement.
        Return the state array as a tuple
        Update all other Kalman filter quantities
        """
        H = self.Hjacob(self.x)
        xp = self.A @ self.x
        Pp = self.A @ self.P @ self.A.T + self.Q

        self.K = Pp @ H.T * inv(H @ Pp @ H.T + self.R)

        self.x = xp + self.K @ (array([z - self.hx(xp)]))
        self.P = Pp - self.K @ H @ Pp

        self.pos = self.x[0]
        self.vel = self.x[1]
        self.alt = self.x[2]
        return self.pos, self.vel, self.alt


    def hx(self,xhat):
        """
        State vector predicted to slant range
        """
        zp = sqrt(xhat[0]**2 + xhat[2]**2)
        return zp


    def Hjacob(self,xp):
        """
        Jacobian used to linearize the measurement matrix H
        given the state vector
        """
        H = zeros((1,3))

        H[0,0] = xp[0]/sqrt(xp[0]**2 + xp[2]**2)
        H[0,1] = 0
        H[0,2] = xp[2]/sqrt(xp[0]**2 + xp[2]**2)
        return H

Run a Simulation

[42]:
Nsamples = 500
dt = 0.05
t = arange(Nsamples)*dt
Nsamples = len(t)
XsavedEKF = zeros((Nsamples,3))
XmsavedEKF = zeros(Nsamples)
ZsavedEKF = zeros(Nsamples)

GetRadar1 = GetRadar()
RadarEKF1 = RadarEKF(dt, initial_state=[0, 90, 1100])
for k in range(Nsamples):
    xm =GetRadar1.measurement()
    XmsavedEKF[k] = xm
    XsavedEKF[k,:] = RadarEKF1.next_sample(xm)
    ZsavedEKF[k] = norm(XsavedEKF[k])
[43]:
figure(figsize=(6,5))
subplot(311)
plot(t,XsavedEKF[:,1])
title(r'Extended Kalman Filter States')
ylabel(r'Vel (m/s)')
xlabel(r'Time (s)')
grid();
subplot(312)
plot(t,XsavedEKF[:,2])
ylabel(r'Alt (m)')
xlabel(r'Time (s)')
grid();
subplot(313)
plot(t,XsavedEKF[:,0])
ylabel(r'Pos (m)')
xlabel(r'Time (s)')
grid();
tight_layout()
../_images/nb_examples_Kalman_Basics_practice_58_0.svg
[44]:
plot(t,XmsavedEKF,'r.',markersize=4)
plot(t,ZsavedEKF)
ylabel(r'Slant Range (m)')
xlabel(r'Time (s)')
legend((r'Measured',r'EKF Estimated'),loc='best')
grid();
../_images/nb_examples_Kalman_Basics_practice_59_0.svg

Chapter 15 Unscented KalmanFilter (UKF)

Here we focus on:

  • The unscented transformation algorithm
  • The estimation of the mean and convariance matrix asscociated with the joint pdf of \(y = f(x)\), where in general \(x\) and \(y\) are vectors, e.g., the state and transformed state
  • Assembling the UKF algorithm from the original KF, but with new and substitute equations due to sigma point generation and transformed sigma points
  • Finally, a Radar tracking example (continuation) from Kim 15.4

720ce39b69d54b4186cc79a3689ae996

[45]:
Image('figs/UKF_Filter.png',width='70%')
[45]:
../_images/nb_examples_Kalman_Basics_practice_62_0.png

Preliminaries

First review how to slice a 2D array into a 2D column or row vector using a third argument set to None. Note this approach works for both right-side and left-side calculations. You know you have done something wrong when you receive a numpy broadcasting error.

[46]:
A = arange(1,21).reshape(4,5)
print('# A 4 x 5 matrix:')
print(A)
print('# Slice out the second column as a 4x1 matrix')
print(A[:,1,None])
print('# Slice out the third row as a 1x5 matrix')
print(A[2,None,:])
# A 4 x 5 matrix:
[[ 1  2  3  4  5]
 [ 6  7  8  9 10]
 [11 12 13 14 15]
 [16 17 18 19 20]]
# Slice out the second column as a 4x1 matrix
[[ 2]
 [ 7]
 [12]
 [17]]
# Slice out the third row as a 1x5 matrix
[[11 12 13 14 15]]
[47]:
def SigmaPoints(xm, P, kappa):
    """
    Calculate the Sigma Points of an unscented Kalman filter

    Mark Wickert December 2017
    Translated P. Kim's program from m-code
    """
    n  = xm.size
    Xi = zeros((n, 2*n+1))  # sigma points = col of Xi
    W  = zeros(2*n+1)
    Xi[:, 0, None] = xm
    W[0] = kappa/(n + kappa)

    U = cholesky((n+kappa)*P)  # U'*U = (n+kappa)*P

    for k in range(n):
        Xi[:, k+1, None] = xm + U[k, None, :].T   # row of U
        W[k+1] = 1/(2*(n+kappa))

    for k in range(n):
        Xi[:, n+k+1, None] = xm - U[k, None, :].T
        W[n+k+1] = 1/(2*(n+kappa))

    return Xi, W
[48]:
def UT(Xi, W, noiseCov = 0):
    """
    Unscented transformation

    Mark Wickert December 2017
    Translated P. Kim's program from m-code
    """
    n, kmax = Xi.shape

    xm = zeros((n,1))
    for k in range(kmax):
        xm += W[k]*Xi[:, k, None]

    xcov = zeros((n, n))
    for k in range(kmax):
        xcov += W[k]*(Xi[:, k, None] - xm)*(Xi[:, k, None] - xm).T

    xcov += noiseCov
    return xm, xcov

Verify that SigmaPoints() and UT() are working:

[49]:
xm = array([[5],[5]])
Px = 9*eye(2)
kappa = 2

Xi, W = SigmaPoints(xm,Px,kappa) # sigma points and weights
xAvg, xCov = UT(Xi, W) # estimate mean vector and covariance matrix using sigma points
[50]:
print(Xi)
print(W)
print(xAvg)
print(xCov)
[[ 5. 11.  5. -1.  5.]
 [ 5.  5. 11.  5. -1.]]
[0.5   0.125 0.125 0.125 0.125]
[[5.]
 [5.]]
[[9. 0.]
 [0. 9.]]
  • The xAvg and xCon values match the original input values of mean and covariance
[51]:
class RadarUKF(object):
    """
    Kim Chapter 15.4 Radar Range Tracking UKF Version

    Python 3.x is assumed so the operator @ can be used for matrix multiply

    Mark Wickert December 2017
    """


    def __init__ (self, dt=0.05, initial_state = [0, 90, 1100]):
        """
        Initialize the object
        """
        self.dt = dt
        self.n = 3
        self.m = 1
        # Process model covariance
        #self.Q = array([[0.01, 0, 0], [0, 0.01, 0], [0, 0, 0.01]])
        self.Q = array([[0, 0, 0], [0, 0.001, 0], [0, 0, 0.001]])
        # Measurement model covariance
        #self.R = array([[100]])
        self.R = array([[10]])
        self.x = array([initial_state]).T
        # Error covariance initialize
        self.P = 100*eye(3)
        self.K = zeros((self.n,1))
        # Initialize pos and vel
        self.pos = 0.0
        self.vel = 0.0
        self.alt = 0.0


    def next_sample(self,z,kappa = 0):
        """
        Update the Kalman filter state by inputting a new scalar measurement.
        Return the state array as a tuple
        Update all other Kalman filter quantities
        """
        Xi, W = SigmaPoints(self.x, self.P, 0)
        fXi = zeros((self.n, 2*self.n + 1))
        for k in range(2*self.n + 1):
            fXi[:, k, None] = self.fx(Xi[:,k,None])
        xp, Pp = UT(fXi, W, self.Q)

        hXi = zeros((self.m, 2*self.n+1))
        for k in range(2*self.n+1):
            hXi[:, k, None] = self.hx(fXi[:,k,None])
        zp, Pz = UT(hXi, W, self.R)

        Pxz = zeros((self.n,self.m))
        for k in range(2*self.n+1):
            Pxz += W[k]*(fXi[:,k,None] - xp) @ (hXi[:, k, None] - zp).T

        self.K = Pxz * inv(Pz)
        self.x = xp + self.K * (z - zp)
        self.P = Pp - self.K @ Pz @ self.K.T

        self.pos = self.x[0]
        self.vel = self.x[1]
        self.alt = self.x[2]
        return self.pos, self.vel, self.alt


    def fx(self,x):
        """
        The function f(x) in Kim
        """
        A = eye(3) + self.dt*array([[0, 1, 0],[0, 0, 0], [0, 0, 0]])
        xp = A @ x
        return xp


    def hx(self,x):
        """
        The range equation r(x1,x3)
        """
        yp = sqrt(x[0]**2 + x[2]**2)
        return yp

Run a Simulation

[52]:
dt = 0.05
Nsamples = 500
t = arange(Nsamples)*dt
Nsamples = len(t)
XsavedUKF = zeros((Nsamples,3))
XmsavedUKF = zeros(Nsamples)
ZsavedUKF = zeros(Nsamples)
KsavedUKF = zeros((Nsamples,3))

GetRadar1 = GetRadar()
RadarUKF1 = RadarUKF(dt,initial_state=[0, 90, 1100])
for k in range(Nsamples):
    xm =GetRadar1.measurement()
    XmsavedUKF[k] = xm
    XsavedUKF[k,:] = RadarUKF1.next_sample(xm)
    ZsavedUKF[k] = norm(XsavedUKF[k])
    KsavedUKF[k,:] = RadarUKF1.K.T
[53]:
figure(figsize=(6,5))
subplot(311)
plot(t,XsavedUKF[:,1])
title(r'Unscented Kalman Filter States')
ylabel(r'Vel (m/s)')
xlabel(r'Time (s)')
grid();
subplot(312)
plot(t,XsavedUKF[:,2])
ylabel(r'Alt (m)')
xlabel(r'Time (s)')
grid();
subplot(313)
plot(t,XsavedUKF[:,0])
ylabel(r'Pos (m)')
xlabel(r'Time (s)')
grid();
tight_layout()
../_images/nb_examples_Kalman_Basics_practice_74_0.svg
[54]:
plot(t,XmsavedUKF,'r.',markersize=4)
plot(t,ZsavedUKF)
ylabel(r'Slant Range (m)')
xlabel(r'Time (s)')
legend((r'Measured',r'UKF Estimated'),loc='best')
grid();
../_images/nb_examples_Kalman_Basics_practice_75_0.svg
  • Take a look at the Kalman gains versus time for each of the states:
[55]:
plot(t,KsavedUKF[:,0])
plot(t,KsavedUKF[:,1])
plot(t,KsavedUKF[:,2])
title(r'Kalman Gain Components versus Time')
ylabel(r'Kalman Gain')
xlabel(r'Time (s)')
legend((r'$K[0]$',r'$K[1]$',r'$K[2]$'),loc='best')
grid();
../_images/nb_examples_Kalman_Basics_practice_77_0.svg