{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pylab inline\n", "from inspect import getsource\n", "import scipy.signal as signal\n", "from IPython.display import Audio, display\n", "from IPython.display import Image, SVG" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from gps_helper.smoothing import RecursiveAverage, MovingAverageFilter, LowPassFilter" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pylab.rcParams['savefig.dpi'] = 100 # default 72\n", "%config InlineBackend.figure_formats=['svg'] # SVG inline viewing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Signal Filtering" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "figure(figsize=(6,5))\n", "f0 =0.1\n", "fs = 100\n", "t = arange(0,20,1/fs)\n", "s = sign(cos(2*pi*.1*t))\n", "subplot(311)\n", "title(r'0.1 Hz Squarewave with RC Lowpass')\n", "ylabel(r'Signal - s')\n", "plot(t,s)\n", "grid()\n", "subplot(312)\n", "std_n = 0.4\n", "r = s + std_n*randn(len(s))\n", "plot(t,r)\n", "ylabel(r's + n')\n", "grid()\n", "subplot(313)\n", "f3 = 3*f0\n", "a = exp(-2*pi*f3/fs)\n", "rf = signal.lfilter([1-a],[1,-a],r)\n", "plot(t,rf)\n", "xlabel(r'Time (s)')\n", "ylabel(r'LPF(s + n)')\n", "grid()\n", "tight_layout()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Measurement Filtering" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(getsource(RecursiveAverage))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(getsource(MovingAverageFilter))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(getsource(LowPassFilter))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Npts = 100\n", "x = 5\n", "v = 0.3*randn(Npts)\n", "z = x + v\n", "yravg = zeros_like(z)\n", "ravg = RecursiveAverage()\n", "y10 = zeros_like(z)\n", "movavg10 = MovingAverageFilter(10)\n", "y20 = zeros_like(z)\n", "movavg20 = MovingAverageFilter(20)\n", "lpf09 = LowPassFilter(0.9)\n", "yfof09 = zeros_like(z)\n", "for k, z_k in enumerate(z):\n", " yravg[k] = ravg.next_sample(z_k)\n", " y10[k] = movavg10.next_sample(z_k)\n", " y20[k] = movavg20.next_sample(z_k)\n", " yfof09[k] = lpf09.next_sample(z_k)\n", "plot(arange(Npts),z,'r.',markersize=4)\n", "plot(arange(Npts),yravg)\n", "plot(arange(Npts),y10)\n", "plot(arange(Npts),y20)\n", "plot(arange(Npts),yfof09)\n", "legend((r'Measurements',r'RecurAvg',r'MovAvg10',\n", " r'MovAvg20',r'First-Order $\\alpha=0.9$'))\n", "ylabel(r'Filter Output')\n", "xlabel(r'Index $n$')\n", "title(r'Measurement Filtering a Noisy Constant')\n", "grid();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Npts = 100\n", "n = arange(0,100)\n", "x = 5*cos(2*pi*n/200)\n", "v = 0.3*randn(Npts)\n", "z = x + v\n", "yravg = zeros_like(z)\n", "ravg = RecursiveAverage()\n", "y10 = zeros_like(z)\n", "movavg10 = MovingAverageFilter(10)\n", "y20 = zeros_like(z)\n", "movavg20 = MovingAverageFilter(20)\n", "lpf09 = LowPassFilter(0.9)\n", "yfof09 = zeros_like(z)\n", "for k, z_k in enumerate(z):\n", " yravg[k] = ravg.next_sample(z_k)\n", " y10[k] = movavg10.next_sample(z_k)\n", " y20[k] = movavg20.next_sample(z_k)\n", " yfof09[k] = lpf09.next_sample(z_k)\n", "plot(arange(Npts),z,'r.',markersize=4)\n", "plot(arange(Npts),yravg)\n", "plot(arange(Npts),y10)\n", "plot(arange(Npts),y20)\n", "plot(arange(Npts),yfof09)\n", "legend((r'Measurements',r'RecurAvg',r'MovAvg10',\n", " r'MovAvg20',r'First-Order $\\alpha=0.9$'))\n", "ylabel(r'Filter Output')\n", "xlabel(r'Index $n$')\n", "title(r'Measurement Filtering Half Cycle Cosine')\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Kalman Filtering Background\n", "It was in 1960 that the Kalman filter was born. Today we are many many innovations beyond its humble beginnings." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image('figs/Kalman_variables.png',width='60%')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image('figs/Kalman_Filter.png',width='70%')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Basic Examples from Kim" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Chapter 10.2: Estimating a Constant Voltage" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class GetVoltage(object):\n", " \"\"\"\n", " A class for generating the battery voltage measurements\n", " \n", " Mark Wickert February 2018\n", " \"\"\"\n", " \n", " def __init__ (self, batt_voltage = 14.4, dt = 0.2, sigma_w = 2):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.sigma_w = sigma_w\n", " self.Voltage_set = batt_voltage\n", " \n", " self.dt = dt\n", " \n", " \n", " def measurement(self):\n", " \"\"\"\n", " Take a measurement\n", " \"\"\"\n", " w = 0 + self.sigma_w*random.randn(1)[0]\n", " z = self.Voltage_set + w\n", " return z" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class SimpleKalman(object):\n", " \"\"\"\n", " Kim Chapter 10.2 Battery voltage estimation with measurement noise\n", " \n", " Python 3.x is assumed so the operator @ can be used for matrix multiply\n", " \n", " Mark Wickert February 2018\n", " \"\"\"\n", " \n", " \n", " def __init__ (self, dt = 0.2, initial_state = 14, P = 6):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.dt = dt\n", " self.A = array([[1]])\n", " self.H = array([[1]])\n", " # Process model covariance\n", " self.Q = array([[0]])\n", " # Measurement model covariance \n", " self.R = 4\n", " self.x = array([[initial_state]])\n", " # Error covariance initialize \n", " self.P = P*eye(1)\n", "\n", " \n", " def next_sample(self,z):\n", " \"\"\"\n", " Update the Kalman filter state by inputting a new \n", " scalar measurement. Return the state array as a tuple\n", " Update all other Kalman filter quantities\n", " \"\"\"\n", " xp = self.A @ self.x\n", " Pp = self.A @ self.P @ self.A.T + self.Q\n", " \n", " self.K = Pp @ self.H.T * inv(self.H @ Pp @ self.H.T + self.R)\n", " \n", " self.x = xp + self.K @ (array([[z]] - self.H @ xp))\n", " self.P = Pp - self.K @ self.H @ Pp\n", " \n", " self.volt = self.x[0]\n", " return self.volt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dt = 0.1\n", "t = arange(0,10+dt,dt)\n", "Xsaved = zeros((len(t),2))\n", "Zsaved = zeros(len(t))\n", "Ksaved = zeros(len(t))\n", "Psaved = zeros(len(t))\n", "\n", "# Create objects for the simulation\n", "GetVoltage1 = GetVoltage(14.0,dt,sigma_w = 2)\n", "SimpleKalman1 = SimpleKalman(initial_state=14)\n", "\n", "for k in range(len(t)):\n", " z = GetVoltage1.measurement()\n", " Xsaved[k,:] = SimpleKalman1.next_sample(z)\n", " Zsaved[k] = z\n", " Ksaved[k] = SimpleKalman1.K\n", " Psaved[k] = SimpleKalman1.P" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,Zsaved,'r.')\n", "plot(t,Xsaved[:,0])\n", "ylabel(r'Voltage (V)')\n", "xlabel(r'Time (s)')\n", "legend((r'Measured',r'Kalman Filter'),loc='best')\n", "title(r'Voltage Estimation')\n", "grid();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "subplot(211)\n", "plot(t,Psaved)\n", "ylabel(r'P')\n", "xlabel(r'Time (s)')\n", "title(r'Covariance ($\\sigma_x^2$) as $P_k$')\n", "grid();\n", "subplot(212)\n", "plot(t,Ksaved)\n", "ylabel(r'K')\n", "xlabel(r'Time (s)')\n", "title(r'Kalman Gain $K_k$')\n", "grid();\n", "tight_layout()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,Ksaved)\n", "ylabel(r'K')\n", "xlabel(r'Time (s)')\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Notes Radial Position Example" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class GetPosVel(object):\n", " \"\"\"\n", " A class for generating position and velocity \n", " measurements and truth values \n", " of the state vector.\n", " \n", " Mark Wickert May 2018\n", " \"\"\"\n", " \n", " \n", " def __init__ (self,pos_set = 0, vel_set = 80.0, dt = 0.1, \n", " Q = [[1,0],[0,3]], R = [[10,0],[0,2]]):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.actual_pos = pos_set\n", " self.actual_vel = vel_set\n", " \n", " self.Q = array(Q)\n", " self.R = array(R)\n", " self.dt = dt\n", " \n", " \n", " def measurement(self):\n", " \"\"\"\n", " Take a measurement\n", " \"\"\"\n", " # Truth position and velocity\n", " self.actual_vel = self.actual_vel\n", " self.actual_pos = self.actual_pos \\\n", " + self.actual_vel*self.dt\n", " \n", " # Measured value is truth plus measurement error\n", " z1 = self.actual_pos + sqrt(self.R[0,0])*random.randn()\n", " z2 = self.actual_vel + sqrt(self.R[1,1])*random.randn()\n", " return array([[z1],[z2]])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class PosKalman(object):\n", " \"\"\"\n", " Position Estimation from Position and Velocity Measurements\n", " \n", " Python 3.x is assumed so the operator @ can be used for matrix multiply\n", " \n", " Mark Wickert May 2018\n", " \"\"\"\n", "\n", " \n", " def __init__ (self, Q, R, initial_state = [0, 20], dt = 0.1):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.dt = dt\n", " self.A = array([[1, dt],[0,1]])\n", " self.H = array([[1,0],[0,1]])\n", " # Process model covariance\n", " self.Q = Q\n", " # Measurement model covariance \n", " self.R = R\n", " self.x = array([[initial_state[0]],[initial_state[1]]])\n", " # Error covariance initialize \n", " self.P = 5*eye(2)\n", " # Initialize state\n", " self.x = array([[0.0],[0.0]])\n", "\n", " \n", " def next_sample(self,z):\n", " \"\"\"\n", " Update the Kalman filter state by inputting a new \n", " scalar measurement. Return the state array as a tuple\n", " Update all other Kalman filter quantities\n", " \"\"\"\n", " xp = self.A @ self.x\n", " Pp = self.A @ self.P @ self.A.T + self.Q\n", " \n", " self.K = Pp @ self.H.T * inv(self.H @ Pp @ self.H.T + self.R)\n", " \n", " self.x = xp + self.K @ (z - self.H @ xp)\n", " self.P = Pp - self.K @ self.H @ Pp\n", " return self.x" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run a Simulation" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "dt = 0.1\n", "t = arange(0,10+dt,dt)\n", "Xsaved = zeros((2,len(t)))\n", "Zsaved = zeros((2,len(t)))\n", "Psaved = zeros(len(t))\n", "Vsaved = zeros(len(t))\n", "# Save history of error covariance matrix diagonal \n", "P_diag = zeros((len(t),2))\n", "\n", "# Create objects for the simulation\n", "Q = array([[1,0],[0,3]])\n", "R = array([[10,0],[0,2]])\n", "GetPos1 = GetPosVel(Q=Q,R=R,dt=dt)\n", "PosKalman1 = PosKalman(Q,R, initial_state=[0,80])\n", "\n", "for k in range(len(t)):\n", " # take a measurement\n", " z = GetPos1.measurement()\n", " # Update the Kalman filter\n", " Xsaved[:,k,None] = PosKalman1.next_sample(z)\n", " Zsaved[:,k,None] = z\n", " Psaved[k] = GetPos1.actual_pos\n", " Vsaved[k] = GetPos1.actual_vel\n", " P_diag[k,:] = PosKalman1.P.diagonal()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "figure(figsize=(6,3.5))\n", "plot(t,Zsaved[0,:]-Psaved,'r.')\n", "plot(t,Xsaved[0,:]-Psaved)\n", "xlabel(r'Time (s)')\n", "ylabel(r'Position Error (m)')\n", "title(r'Position Error: $x_1 - p_{true}$')\n", "legend((r'Measurement Error',r'Estimation Error'))\n", "grid();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "figure(figsize=(6,3.5))\n", "plot(t,Zsaved[1,:]-Vsaved,'r.')\n", "plot(t,Xsaved[1,:]-Vsaved)\n", "xlabel(r'Time (s)')\n", "ylabel(r'Velocity Error (m)')\n", "title(r'Velocity Error: $x_2 - v_{true}$')\n", "legend((r'Measurement Error',r'Estimation Error'))\n", "grid();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "figure(figsize=(6,3.5))\n", "plot(t,P_diag[:,0])\n", "plot(t,P_diag[:,1])\n", "title(r'Covariance Matrix $\\mathbf{P}$ Diagonal Entries')\n", "ylabel(r'Variance')\n", "xlabel(r'Time (s) (given $T_s = 0.1$s)')\n", "legend((r'$\\sigma_p^2$ (m$^2$)',r'$\\sigma_v^2$ (m$^4$)'))\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Chapter 11.2 & 11.3: Estimating Velocity from Position" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class GetPos(object):\n", " \"\"\"\n", " A class for generating position measurements as found in Kim\n", " \n", " Mark Wickert December 2017\n", " \"\"\"\n", " \n", " \n", " def __init__ (self,Posp = 0, Vel_set = 80.0, dt = 0.1, \n", " var_w = 10.0, var_v = 10.0):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.Posp = Posp\n", " self.Vel_set = Vel_set\n", " self.Velp = Vel_set\n", " \n", " self.dt = dt\n", " \n", " self.var_w = var_w\n", " self.var_v = var_v\n", " \n", " \n", " def measurement(self):\n", " \"\"\"\n", " Take a measurement\n", " \"\"\"\n", " # The velocity process noise\n", " w = 0 + self.var_w*random.randn(1)[0]\n", " # The position measurement noise\n", " v = 0 + self.var_v*random.randn(1)[0]\n", " \n", " # Update the position measurement\n", " z = self.Posp + self.Velp*self.dt + v\n", " # Also update the truth values of position and velocity\n", " self.Posp = z - v\n", " self.Velp = self.Vel_set + w\n", " return z" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class DvKalman(object):\n", " \"\"\"\n", " Kim Chapter 11.2 Velocity from Position Estimation\n", " \n", " Python 3.x is assumed so the operator @ can be used for matrix multiply\n", " \n", " Mark Wickert December 2017\n", " \"\"\"\n", "\n", " \n", " def __init__ (self,initial_state = [0, 20]):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.dt = 0.1\n", " self.A = array([[1, self.dt],[0,1]])\n", " self.H = array([[1,0]])\n", " # Process model covariance\n", " self.Q = array([[1,0],[0,3]])\n", " # Measurement model covariance \n", " self.R = 10\n", " self.x = array([[initial_state[0]],[initial_state[1]]])\n", " # Error covariance initialize \n", " self.P = 5*eye(2)\n", " # Initialize pos and vel\n", " self.pos = 0.0\n", " self.vel = 0.0\n", "\n", " \n", " def next_sample(self,z):\n", " \"\"\"\n", " Update the Kalman filter state by inputting a new \n", " scalar measurement. Return the state array as a tuple\n", " Update all other Kalman filter quantities\n", " \"\"\"\n", " xp = self.A @ self.x\n", " Pp = self.A @ self.P @ self.A.T + self.Q\n", " \n", " self.K = Pp @ self.H.T * inv(self.H @ Pp @ self.H.T + self.R)\n", " \n", " self.x = xp + self.K @ (array([[z]] - self.H @ xp))\n", " self.P = Pp - self.K @ self.H @ Pp\n", " \n", " self.pos = self.x[0]\n", " self.vel = self.x[1]\n", " return self.pos, self.vel" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run a Simulation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dt = 0.1\n", "t = arange(0,10+dt,dt)\n", "Xsaved = zeros((len(t),2))\n", "Zsaved = zeros(len(t))\n", "Vsaved = zeros(len(t))\n", "\n", "# Create objects for the simulation\n", "GetPos1 = GetPos()\n", "DvKalman1 = DvKalman()\n", "\n", "for k in range(len(t)):\n", " z = GetPos1.measurement()\n", " # pos, vel = DvKalman1.update(z)\n", " \n", " Xsaved[k,:] = DvKalman1.next_sample(z)\n", " Zsaved[k] = z\n", " Vsaved[k] = GetPos1.Velp" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,Zsaved,'r.')\n", "plot(t,Xsaved[:,0])\n", "ylabel(r'Position (m)')\n", "xlabel(r'Time (s)')\n", "legend((r'Measured',r'Kalman Filter'),loc='best')\n", "grid();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,Vsaved,'r.')\n", "plot(t,Xsaved[:,1])\n", "ylabel(r'Velocity (m/s)')\n", "xlabel(r'Time (s)')\n", "legend((r'True Speed',r'Kalman Filter'),loc='best')\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Chapter 11.4: Estimating Position from Velocity" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class GetVel(object):\n", " \"\"\"\n", " A class for generating velocity measurements as found in Kim 11.4\n", " \n", " Mark Wickert December 2017\n", " \"\"\"\n", " \n", " \n", " def __init__ (self,Pos_set = 0, Vel_set = 80.0, dt = 0.1, var_v = 10.0):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.Posp = Pos_set\n", " self.Vel_set = Vel_set\n", " self.Velp = Vel_set\n", " \n", " self.dt = dt\n", " \n", " self.var_v = var_v\n", " \n", " \n", " def measurement(self):\n", " \"\"\"\n", " Take a measurement\n", " \"\"\"\n", " # The velocity measurement noise\n", " v = 0 + self.var_v*random.randn(1)[0]\n", " \n", " # Also update the truth values of position and velocity\n", " self.Posp += self.Velp*self.dt\n", " self.Velp = self.Vel_set + v\n", " z = self.Velp\n", " return z" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class IntKalman(object):\n", " \"\"\"\n", " Kim Chapter 11.4 Position from Velocity Estimation\n", " \n", " Python 3.x is assumed so the operator @ can be used for matrix multiply\n", " \n", " Mark Wickert December 2017\n", " \"\"\"\n", "\n", " \n", " def __init__ (self,initial_state = [0, 20]):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.dt = 0.1\n", " self.A = array([[1, dt],[0,1]])\n", " self.H = array([[0,1]])\n", " # Process model covariance\n", " self.Q = array([[1,0],[0,3]])\n", " # Measurement model covariance \n", " self.R = 10\n", " self.x = array([[initial_state[0]],[initial_state[1]]])\n", " # Error covariance initialize \n", " self.P = 5*eye(2)\n", " # Initialize pos and vel\n", " self.pos = 0.0\n", " self.vel = 0.0\n", "\n", " \n", " def next_sample(self,z):\n", " \"\"\"\n", " Update the Kalman filter state by inputting a new scalar measurement.\n", " Return the state array as a tuple\n", " Update all other Kalman filter quantities\n", " \"\"\"\n", " xp = self.A @ self.x\n", " Pp = self.A @ self.P @ self.A.T + self.Q\n", " \n", " self.K = Pp @ self.H.T * inv(self.H @ Pp @ self.H.T + self.R)\n", " \n", " self.x = xp + self.K @ (array([[z]] - self.H @ xp))\n", " self.P = Pp - self.K @ self.H @ Pp\n", " \n", " self.pos = self.x[0]\n", " self.vel = self.x[1]\n", " return self.pos, self.vel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dt = 0.1\n", "t = arange(0,10+dt,dt)\n", "Xsaved = zeros((len(t),2))\n", "Zsaved = zeros(len(t))\n", "Psaved = zeros(len(t))\n", "\n", "# Create objects for the simulation\n", "GetVel1 = GetVel()\n", "IntKalman1 = IntKalman()\n", "\n", "for k in range(len(t)):\n", " z = GetVel1.measurement()\n", " \n", " Xsaved[k,:] = IntKalman1.next_sample(z)\n", " Zsaved[k] = z\n", " Psaved[k] = GetVel1.Posp" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,Zsaved,'r.')\n", "plot(t,Xsaved[:,1])\n", "ylabel(r'Velocity (m/s)')\n", "xlabel(r'Time (s)')\n", "legend((r'Measurements',r'Kalman Filter'),loc='best')\n", "grid();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,Xsaved[:,0])\n", "plot(t,Psaved)\n", "ylabel(r'Position (m)')\n", "xlabel(r'Time (s)')\n", "legend((r'True Position' ,r'Kalman Filter',),loc='best')\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Chapter 11.5: Measuring Velocity with Sonar" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# From scipy.io \n", "from scipy.io import loadmat" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class GetSonar(object):\n", " \"\"\"\n", " A class for playing back sonar altitude measurements as found in Kim 2.4\n", " and later used in Kim 11.5\n", " \n", " Mark Wickert December 2017\n", " \"\"\"\n", " \n", " \n", " def __init__ (self,):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " sonarD = loadmat('SonarAlt')\n", " self.h = sonarD['sonarAlt'].flatten()\n", " self.Max_pts = len(self.h)\n", " self.k = 0\n", " \n", " \n", " def measurement(self):\n", " \"\"\"\n", " Take a measurement\n", " \"\"\"\n", " \n", " h = self.h[self.k]\n", " self.k += 1\n", " if self.k > self.Max_pts:\n", " print('Recycling data by starting over')\n", " self.k = 0\n", " return h" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Nsamples = 500\n", "t = arange(Nsamples)*.02\n", "Xsaved = zeros(Nsamples)\n", "Xmsaved = zeros(Nsamples)\n", "Xksaved = zeros((Nsamples,2))\n", "\n", "GetSonar1 = GetSonar()\n", "MovAvgFilter1 = MovingAverageFilter(10)\n", "DvKalman2 = DvKalman()\n", "for k in range(Nsamples):\n", " xm =GetSonar1.measurement()\n", " Xmsaved[k] = xm\n", " Xsaved[k] = MovAvgFilter1.next_sample(xm)\n", " Xksaved[k,:] = DvKalman2.next_sample(xm)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,Xmsaved,'r.',markersize=4)\n", "plot(t,Xsaved)\n", "plot(t,Xksaved[:,0])\n", "xlim([0,10])\n", "ylabel(r'Altitude (m)')\n", "xlabel(r'Time (s)')\n", "legend((r'Measurement' ,r'Moving Average - 10',r'Kalman-pos'),loc='best')\n", "grid();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,Xksaved[:,1],'orange')\n", "xlim([0,10])\n", "ylabel(r'Velocity (m/s)')\n", "xlabel(r'Time (s)')\n", "legend((r'Kalman-vel',),loc='best')\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Nonlinear Kalman Filter\n", "## Chapter 14 Extended Kalman Filter (EKF)\n", "Here we focus on:\n", "\n", "* Replacing $\\mathbf{Ax}_k$ with the nonlinearity $f(\\mathbf{x}_k)$ and $\\mathbf{Hx}_k$ with the nonlinearity $\\mathbf{h}(\\mathbf{x}_k)$\n", "* How in the end the EKF linearizes the nonlinear model at each time step by using the Jacobian matrices $\\mathbf{A} \\equiv \\partial \\mathbf{f}/\\partial \\mathbf{x}$ and $\\mathbf{H} \\equiv \\partial \\mathbf{h}/\\partial \\mathbf{x}$\n", "* Assembling the EKF algorithm from the Kalman filter foundation\n", "* Finally, a Radar tracking example (Kim 14.4)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image('figs/EKF_Filter.png',width='70%')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class GetRadar(object):\n", " \"\"\"\n", " A class for generating radar slant range measurements as found in Kim 14.4\n", " \n", " Mark Wickert December 2017\n", " \"\"\"\n", " \n", " \n", " def __init__ (self,Pos_set = 0, Vel_set = 80.0, Alt_set = 1000, dt = 0.1,\n", " var_Vel = 25.0, var_Alt = 100):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.Posp = Pos_set\n", " self.Vel_set = Vel_set\n", " self.Alt_set = Alt_set\n", " \n", " self.dt = dt\n", " \n", " self.var_Vel = var_Vel\n", " self.var_Alt = var_Alt\n", " \n", " \n", " def measurement(self):\n", " \"\"\"\n", " Take a measurement\n", " \"\"\"\n", " # The velocity process with uncertainty\n", " vel = self.Vel_set + sqrt(self.var_Vel)*random.randn(1)[0]\n", " # The altitude process with uncertainty\n", " alt = self.Alt_set + sqrt(self.var_Alt)*random.randn(1)[0]\n", "\n", " # New position\n", " pos = self.Posp + vel*dt\n", " \n", " # Slant range measurement noise\n", " v = 0 + pos*0.05*random.randn(1)[0]\n", " # The slant range\n", " r = sqrt(pos**2 + alt**2) + v\n", " self.Posp = pos\n", " return r" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class RadarEKF(object):\n", " \"\"\"\n", " Kim Chapter 14.4 Radar Range Tracking\n", " \n", " Python 3.x is assumed so the operator @ can be used for matrix multiply\n", " \n", " Mark Wickert December 2017\n", " \"\"\"\n", "\n", " \n", " def __init__ (self, dt=0.05, initial_state = [0, 90, 1100]):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.dt = dt\n", " self.A = eye(3) + dt*array([[0, 1, 0], [0, 0, 0], [0, 0, 0]])\n", " # Process model covariance\n", " self.Q = array([[0, 0, 0], [0, 0.001, 0], [0, 0, 0.001]])\n", " # Measurement model covariance \n", " self.R = array([[10]])\n", " self.x = array(initial_state)\n", " # Error covariance initialize \n", " self.P = 10*eye(3)\n", " # Initialize pos and vel\n", " self.pos = 0.0\n", " self.vel = 0.0\n", " self.alt = 0.0\n", "\n", " \n", " def next_sample(self,z):\n", " \"\"\"\n", " Update the Kalman filter state by inputting a new scalar measurement.\n", " Return the state array as a tuple\n", " Update all other Kalman filter quantities\n", " \"\"\"\n", " H = self.Hjacob(self.x)\n", " xp = self.A @ self.x\n", " Pp = self.A @ self.P @ self.A.T + self.Q\n", " \n", " self.K = Pp @ H.T * inv(H @ Pp @ H.T + self.R)\n", " \n", " self.x = xp + self.K @ (array([z - self.hx(xp)]))\n", " self.P = Pp - self.K @ H @ Pp\n", " \n", " self.pos = self.x[0]\n", " self.vel = self.x[1]\n", " self.alt = self.x[2]\n", " return self.pos, self.vel, self.alt\n", " \n", " \n", " def hx(self,xhat):\n", " \"\"\"\n", " State vector predicted to slant range\n", " \"\"\"\n", " zp = sqrt(xhat[0]**2 + xhat[2]**2)\n", " return zp\n", " \n", " \n", " def Hjacob(self,xp):\n", " \"\"\"\n", " Jacobian used to linearize the measurement matrix H\n", " given the state vector\n", " \"\"\"\n", " H = zeros((1,3))\n", " \n", " H[0,0] = xp[0]/sqrt(xp[0]**2 + xp[2]**2)\n", " H[0,1] = 0\n", " H[0,2] = xp[2]/sqrt(xp[0]**2 + xp[2]**2)\n", " return H" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run a Simulation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Nsamples = 500\n", "dt = 0.05\n", "t = arange(Nsamples)*dt\n", "Nsamples = len(t)\n", "XsavedEKF = zeros((Nsamples,3))\n", "XmsavedEKF = zeros(Nsamples)\n", "ZsavedEKF = zeros(Nsamples)\n", "\n", "GetRadar1 = GetRadar()\n", "RadarEKF1 = RadarEKF(dt, initial_state=[0, 90, 1100])\n", "for k in range(Nsamples):\n", " xm =GetRadar1.measurement()\n", " XmsavedEKF[k] = xm\n", " XsavedEKF[k,:] = RadarEKF1.next_sample(xm)\n", " ZsavedEKF[k] = norm(XsavedEKF[k])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "figure(figsize=(6,5))\n", "subplot(311)\n", "plot(t,XsavedEKF[:,1])\n", "title(r'Extended Kalman Filter States')\n", "ylabel(r'Vel (m/s)')\n", "xlabel(r'Time (s)')\n", "grid();\n", "subplot(312)\n", "plot(t,XsavedEKF[:,2])\n", "ylabel(r'Alt (m)')\n", "xlabel(r'Time (s)')\n", "grid();\n", "subplot(313)\n", "plot(t,XsavedEKF[:,0])\n", "ylabel(r'Pos (m)')\n", "xlabel(r'Time (s)')\n", "grid();\n", "tight_layout()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,XmsavedEKF,'r.',markersize=4)\n", "plot(t,ZsavedEKF)\n", "ylabel(r'Slant Range (m)')\n", "xlabel(r'Time (s)')\n", "legend((r'Measured',r'EKF Estimated'),loc='best')\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Chapter 15 Unscented KalmanFilter (UKF)\n", "Here we focus on:\n", "\n", "* The unscented transformation algorithm\n", "* The estimation of the mean and convariance matrix asscociated with the joint pdf of $y = f(x)$, where in general $x$ and $y$ are vectors, e.g., the state and transformed state\n", "* Assembling the UKF algorithm from the original KF, but with new and substitute equations due to sigma point generation and transformed sigma points\n", "* Finally, a Radar tracking example (continuation) from Kim 15.4" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image('figs/UKF_Filter.png',width='70%')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Preliminaries\n", "First review how to slice a 2D array into a 2D column or row vector using a third argument set to `None`. Note this approach works for both right-side and left-side calculations. You know you have done something wrong when you receive a numpy `broadcasting` error." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "A = arange(1,21).reshape(4,5)\n", "print('# A 4 x 5 matrix:')\n", "print(A)\n", "print('# Slice out the second column as a 4x1 matrix')\n", "print(A[:,1,None])\n", "print('# Slice out the third row as a 1x5 matrix')\n", "print(A[2,None,:])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def SigmaPoints(xm, P, kappa):\n", " \"\"\"\n", " Calculate the Sigma Points of an unscented Kalman filter\n", " \n", " Mark Wickert December 2017\n", " Translated P. Kim's program from m-code\n", " \"\"\"\n", " n = xm.size\n", " Xi = zeros((n, 2*n+1)) # sigma points = col of Xi\n", " W = zeros(2*n+1)\n", " Xi[:, 0, None] = xm\n", " W[0] = kappa/(n + kappa)\n", "\n", " U = cholesky((n+kappa)*P) # U'*U = (n+kappa)*P\n", " \n", " for k in range(n):\n", " Xi[:, k+1, None] = xm + U[k, None, :].T # row of U\n", " W[k+1] = 1/(2*(n+kappa))\n", "\n", " for k in range(n):\n", " Xi[:, n+k+1, None] = xm - U[k, None, :].T\n", " W[n+k+1] = 1/(2*(n+kappa))\n", "\n", " return Xi, W" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def UT(Xi, W, noiseCov = 0):\n", " \"\"\"\n", " Unscented transformation\n", " \n", " Mark Wickert December 2017\n", " Translated P. Kim's program from m-code\n", " \"\"\"\n", " n, kmax = Xi.shape\n", "\n", " xm = zeros((n,1))\n", " for k in range(kmax):\n", " xm += W[k]*Xi[:, k, None]\n", " \n", " xcov = zeros((n, n))\n", " for k in range(kmax):\n", " xcov += W[k]*(Xi[:, k, None] - xm)*(Xi[:, k, None] - xm).T\n", "\n", " xcov += noiseCov\n", " return xm, xcov" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Verify that `SigmaPoints()` and `UT()` are working:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "xm = array([[5],[5]])\n", "Px = 9*eye(2)\n", "kappa = 2\n", "\n", "Xi, W = SigmaPoints(xm,Px,kappa) # sigma points and weights\n", "xAvg, xCov = UT(Xi, W) # estimate mean vector and covariance matrix using sigma points" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(Xi)\n", "print(W)\n", "print(xAvg)\n", "print(xCov)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* The `xAvg` and `xCon` values match the original input values of mean and covariance" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class RadarUKF(object):\n", " \"\"\"\n", " Kim Chapter 15.4 Radar Range Tracking UKF Version\n", " \n", " Python 3.x is assumed so the operator @ can be used for matrix multiply\n", " \n", " Mark Wickert December 2017\n", " \"\"\"\n", "\n", " \n", " def __init__ (self, dt=0.05, initial_state = [0, 90, 1100]):\n", " \"\"\"\n", " Initialize the object\n", " \"\"\"\n", " self.dt = dt\n", " self.n = 3\n", " self.m = 1\n", " # Process model covariance\n", " #self.Q = array([[0.01, 0, 0], [0, 0.01, 0], [0, 0, 0.01]])\n", " self.Q = array([[0, 0, 0], [0, 0.001, 0], [0, 0, 0.001]])\n", " # Measurement model covariance \n", " #self.R = array([[100]])\n", " self.R = array([[10]])\n", " self.x = array([initial_state]).T\n", " # Error covariance initialize \n", " self.P = 100*eye(3)\n", " self.K = zeros((self.n,1))\n", " # Initialize pos and vel\n", " self.pos = 0.0\n", " self.vel = 0.0\n", " self.alt = 0.0\n", "\n", " \n", " def next_sample(self,z,kappa = 0):\n", " \"\"\"\n", " Update the Kalman filter state by inputting a new scalar measurement.\n", " Return the state array as a tuple\n", " Update all other Kalman filter quantities\n", " \"\"\"\n", " Xi, W = SigmaPoints(self.x, self.P, 0)\n", " fXi = zeros((self.n, 2*self.n + 1))\n", " for k in range(2*self.n + 1):\n", " fXi[:, k, None] = self.fx(Xi[:,k,None])\n", " xp, Pp = UT(fXi, W, self.Q)\n", " \n", " hXi = zeros((self.m, 2*self.n+1))\n", " for k in range(2*self.n+1):\n", " hXi[:, k, None] = self.hx(fXi[:,k,None])\n", " zp, Pz = UT(hXi, W, self.R)\n", " \n", " Pxz = zeros((self.n,self.m))\n", " for k in range(2*self.n+1):\n", " Pxz += W[k]*(fXi[:,k,None] - xp) @ (hXi[:, k, None] - zp).T\n", " \n", " self.K = Pxz * inv(Pz)\n", " self.x = xp + self.K * (z - zp)\n", " self.P = Pp - self.K @ Pz @ self.K.T\n", " \n", " self.pos = self.x[0]\n", " self.vel = self.x[1]\n", " self.alt = self.x[2]\n", " return self.pos, self.vel, self.alt\n", " \n", " \n", " def fx(self,x):\n", " \"\"\"\n", " The function f(x) in Kim\n", " \"\"\"\n", " A = eye(3) + self.dt*array([[0, 1, 0],[0, 0, 0], [0, 0, 0]])\n", " xp = A @ x\n", " return xp\n", " \n", " \n", " def hx(self,x):\n", " \"\"\"\n", " The range equation r(x1,x3)\n", " \"\"\"\n", " yp = sqrt(x[0]**2 + x[2]**2)\n", " return yp" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run a Simulation" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "dt = 0.05\n", "Nsamples = 500\n", "t = arange(Nsamples)*dt\n", "Nsamples = len(t)\n", "XsavedUKF = zeros((Nsamples,3))\n", "XmsavedUKF = zeros(Nsamples)\n", "ZsavedUKF = zeros(Nsamples)\n", "KsavedUKF = zeros((Nsamples,3))\n", "\n", "GetRadar1 = GetRadar()\n", "RadarUKF1 = RadarUKF(dt,initial_state=[0, 90, 1100])\n", "for k in range(Nsamples):\n", " xm =GetRadar1.measurement()\n", " XmsavedUKF[k] = xm\n", " XsavedUKF[k,:] = RadarUKF1.next_sample(xm)\n", " ZsavedUKF[k] = norm(XsavedUKF[k])\n", " KsavedUKF[k,:] = RadarUKF1.K.T" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "figure(figsize=(6,5))\n", "subplot(311)\n", "plot(t,XsavedUKF[:,1])\n", "title(r'Unscented Kalman Filter States')\n", "ylabel(r'Vel (m/s)')\n", "xlabel(r'Time (s)')\n", "grid();\n", "subplot(312)\n", "plot(t,XsavedUKF[:,2])\n", "ylabel(r'Alt (m)')\n", "xlabel(r'Time (s)')\n", "grid();\n", "subplot(313)\n", "plot(t,XsavedUKF[:,0])\n", "ylabel(r'Pos (m)')\n", "xlabel(r'Time (s)')\n", "grid();\n", "tight_layout()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,XmsavedUKF,'r.',markersize=4)\n", "plot(t,ZsavedUKF)\n", "ylabel(r'Slant Range (m)')\n", "xlabel(r'Time (s)')\n", "legend((r'Measured',r'UKF Estimated'),loc='best')\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Take a look at the Kalman gains versus time for each of the states:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot(t,KsavedUKF[:,0])\n", "plot(t,KsavedUKF[:,1])\n", "plot(t,KsavedUKF[:,2])\n", "title(r'Kalman Gain Components versus Time')\n", "ylabel(r'Kalman Gain')\n", "xlabel(r'Time (s)')\n", "legend((r'$K[0]$',r'$K[1]$',r'$K[2]$'),loc='best')\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## References\n", "\n", "1. [Phil Kim, *Kalman Filtering for Beginners with MATLAB Examples*, 2011.](https://www.amazon.com/Kalman-Filter-Beginners-MATLAB-Examples/dp/1463648359/ref=sr_1_9?ie=UTF8&qid=1514912616&sr=8-9&keywords=Kalman+Filtering)\n", "2. [Robert Brown and Patrick Hwang, *Introduction to Random Signals and Applied Kalman Filtering*, 4th edition, 2012.](https://www.amazon.com/Introduction-Signals-Applied-Filtering-Exercises/dp/0470609699/ref=sr_1_3?ie=UTF8&qid=1514912616&sr=8-3&keywords=Kalman+Filtering)\n", "3. [Elliot Kaplan, editor, *Understanding GPS Principles and Applications*, 1996 (3rd edition available).](https://www.amazon.com/Understanding-Principles-Applications-Artech-Communications/dp/0890067937/ref=sr_1_2?ie=UTF8&qid=1516027849&sr=8-2&keywords=UNderstanding+GPS)\n", "4. [Dan Simon, *Optimal State Estimation*, 2006.](https://www.amazon.com/Optimal-State-Estimation-Nonlinear-Approaches/dp/0471708585/ref=sr_1_1?s=books&ie=UTF8&qid=1516027927&sr=1-1&keywords=Optimal+State+Estimation)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }