Custom Substates and Signals

In this section, we’ll go over the steps required to write a new substate and signal object. The problem we’ll consider is two-dimensional position estimation.

Writing a Substate Object

As discussed in previous sections, the substate object is the object which stores some component of the state vector, or possibly the entire state vector, depending on the application. Part of the utility of the modest package is that it allows users to quickly write their own substates, and then combine those states in a modular way. In this section, we’ll go through the details of what is required to write a substate object.

Required Methods

In order to function as a substate, there are seven methods which a class must have. These methods are outlined below.

  • __init__() Initialize the object

  • getStateVector() Returns the most recent estimated state vector

  • storeStateVector() Receives an updated state vector (time or measurement updated) and stores it. If the substate has any additional “post-processing” that needs to be done on the substate after an update, this is the place to do it.

  • covariance() Returns the current covariance. This method is only called once, when a substate is added. After that, the ModularFilter class handles the covariance matrix.

  • dimension() Returns the dimension of the substate

  • timeUpdate() Returns the time-update matrices, given system dynamics

  • getMeasurementUpdateMatrices() Returns the measurement-update matrices, given the measurement

    Of these methods, only the last two (timeUpdate and getMeasurementMatrices) are nescessarily specific to substate. The others are often not unique to the substate. Consequently, these functions have been defined in the base class SubState. So, we can inherit from SubState and only have to write the timeUpdate() and getMeasurementUpdateMatrices() methods. We can do that as follows.

from ModularFilter import modest as md
import numpy as np
from scipy.linalg import block_diag

# Define our class
class twoDPositionVelocity(md.substates.SubState):

Now we need to define our methods, including the initialization method. The initialization method has to do a few things. It needs to get the initial estimates for the state vector and covariance matrix, and since it’s inheriting from SubState, it needs to initialize the “super” class.

Initialization Method

def __init__ (self, stateVector, covariance, objectID='', time=0):
    stateVectorHistory = {
        'stateVector': stateVector,
        'covariance': covariance,
        't': time,
        'stateVectorID':0,
        'aPriori':True
    }

    super().__init__(stateDimension=4,stateVectorHistory=stateVectorHistory, objectID=objectID)

This initialization function is pretty rudimentary, in part because SubState does a lot of the work for us (like checking dimensionality). Of course in a more complicated substate we might have to do more initialization.

There is one key bit of initialization that we did have to do: the creation of the stateVectorHistory dictionary. In SubState type classes, the state vector is stored in a dictionary containing (at minimum) the state vector, the covariance, and a unique identifier, the stateVectorID. By default, every state estimate over the entire life of the object is stored in a list of such dictionaries, all handled by the SubState class. This is mainly for analysis (allowing the user to look at the time history of the state), but it is also used by SubState for storing and getting the most recent versions of the state vector when needed. SubState expects to receive this dictionary on initialization; if it doesn’t, or if the dict doesn’t have the right members, SubState will throw an error.

Now that we’ve defined the initialization function, we can define our time and measurement update methods.

Time Update Method

def timeUpdate(self, dT, dynamics={}):
  subF = np.array([[1, dT],[0, 1]])
  F = block_diag(subF, subF)

  accelerationKey = str(self) + 'acceleration'
  if accelerationKey in dynamics:
     subQ = np.array([[dT/4, dT/2],[dT/2, dT]])
     Q = block_diag(subQ, subQ) * dynamics[accelerationKey]['var']
  else:
     Q = np.zeros([self.dimension(), self.dimension()])

  return {'F': F, 'Q': Q}

The main job of this relatively simple function is to generate the time-update equations for an object in two-dimensional motion with acceleration as an input, and pass them out to the caller in a standard dictionary format.

We also check to make sure that the dynamics information we’re interested in is actually contained in the dynamics dictionary. We don’t assume that our substate is the only one running in the filter; there could be others with their own accelerations. Consequently, we build in a check to verify that the dictionary we’re getting contains the acceleration we’re interested in. The method the substate uses to identify it’s dynamics information is up to you as the developer (i.e. you don’t have to follow the “objectID + acceleration” format). As long as the substate knows what it should be looking for, you can use whatever key you want.

Measurement Update Method

The other method that we need to define is the measurement update method. This is generally a little bit more complicated. That’s because there are potentially a lot of different kinds of measurements we need to handle. There could be direct measurements of position (for instance from a GPS receiver), there could be range and/or bearing measurement from known navigational beacons, or there could be velocity measurements. Consequently, the measurement update method can be a bit complex, and it is often convenient to define additional methods to handle individual sub-cases (this is entirely up to the user of course).

def getMeasurementMatrices(self, measurement, source=None):

    if not source:
        return

    HDict = {}
    RDict = {}
    dYDict = {}

    currentStateVector = self.stateVectorHistory[-1]['stateVector']
    currentX = currentStateVector[0]
    currentY = currentStateVector[2]
    measurementPosition = source.position
    positionDifference = np.array([currentX, currentY]) - measurementPosition
    predictedRange = np.linalg.norm(positionDifference)
    predictedAngle = np.arctan2(positionDifference[1],positionDifference[0])

    if 'range' in measurement:

        H = np.array([[
            positionDifference[0] / predictedRange,
            0,
            positionDifference[1] / predictedRange,
            0
        ]])

        R = np.array([[measurement['range']['var']]])

        dY = measurement['range']['value'] - H.dot(currentStateVector)
        matrixKey = str(self) + ' ' + str(source.signalID()) + ' range'
        HDict[matrixKey] = H
        RDict[matrixKey] = R
        dYDict[matrixKey] = dY

    if 'bearingAngle' in measurement:
        H = np.array([[
            -positionDifference[1]/np.square(predictedRange),
            0,
            positionDifference[0]/np.square(predictedRange),
            0
        ]])

        R = np.array([[measurement['bearingAngle']['var']]])

        dY = measurement['bearingAngle']['value'] - predictedAngle

        matrixKey = str(self) + ' ' + str(source.signalID()) + ' bearingAngle'

        HDict[matrixKey] = H
        RDict[matrixKey] = R
        dYDict[matrixKey] = dY

    return {'H': HDict, 'R': RDict, 'dY': dYDict}

There is a lot going on in this method, so let’s unpack it a little bit at a time. First, note the inputs. Any time the getMeasurementUpdateMatrices() method is called, the method expects to receive as arguments the measurement itself, as well as some kind of information about the signal source. (At some point during development, I envisioned instances where this method would be called when no signal source information was present, so this was left as an optional argument. However I don’t think there are currently any actual implementations where this is the case).

Next, we note that the source object is arbitrary. It is up to the getMeasurementUpdateMatrices() method to evaluate what the signal source is, and how to generate the appropriate measurement update matrices. In this simple example, the only checking done is to evaluate whether the signal source exists at all, but in a more complicated implimentation more checking might be nescessary.

As with the dynamics dictionary in the time update method, the measurement matrix method expects to receive the measurement as a dictionary. A measurement may have multiple components as well. For instance, a measurement passed to this method might look something like this:

measurementDict = {
    'range': {
        'value': 14.5,
        'var': 0.1
    }
    'bearingAngle': {
        'value': 0.345,
        'var': 0.001
    },
    'temperature': {
        'value': 22.4,
        'var': 2.1
    }
}

Again, as with the dynamics dictionary, the measurement may contain quantities of interest to the substate along with irrelevant quantities that might be of interest to other, unrelated substates. It is the job of the getMeasurementUpdateMatrices() method to evaluate what quantities are contained in the measurement, and how to build the appropriate measurement matrices. This is what is happening in the if statements.

As written, the substate only cares about two types of measurements: range and bearing angle. getMeasurementUpdateMatrices() check whether either (or both) of these types of measurements are contained in the measurement dictionary, and if so, it builds the appropriate measurement update matrices. The mathematics of the measurement update matrices are not particularly relevant to the mechanics of the modest package, so we won’t worry about the derivations here. (Of course, your equations must be correct for the estimator to function properly!)

It is important to note the output of getMeasurementUpdateMatrices(). Specifically, the output should be three dictionaries: one corresponding to the measurement mapping matrix (often denoted as \(H\)), one corresponding to the measurement noise matrix (often denoted as \(R\)), and one corresponding to the measurement residual matrix (often detnoted as \(\delta y\)). In each of these dictionaries, modest will expect to find key-value pairs, where each key contains a unique label corresponding to the measurement component, and the value corresponds to the sub-component of the measurement matrix that corresponds to that measurement. For example:

HDict = {
    'object1 range': rangeHMatrix
    'object1 bearingAngle': bearingAngleHMatrix
}

RDict = {
    'object1 range': rangeRMatrix
    'object1 bearingAngle': bearingAngleRMatrix
}

dYDict = {
    'object1 range': rangedYMatrix
    'object1 bearingAngle': bearingAngledYMatrix
}

Mathematically, these components will be joined together to form the following matrices.

\[\begin{split}\mathbf{H} = \begin{bmatrix}\textrm{obj. 1 range } H \textrm{ matrix}\\\textrm{obj. 1 bearing } H \textrm{ matrix} \\ \textrm{(other sub }H\textrm{ matrices)}\end{bmatrix}\end{split}\]
\[\begin{split}\mathbf{R} = \begin{bmatrix}\textrm{obj. 1 range } R \textrm{ matrix} & 0 & \vdots \\ 0 &\textrm{obj. 1 bearing } R \textrm{ matrix} & \vdots \\ \cdots & \cdots & \textrm{(other sub }R\textrm{ matrices)}\end{bmatrix}\end{split}\]
\[\begin{split}\boldsymbol{\delta} \mathbf{Y} = \begin{bmatrix}\textrm{obj. 1 range } \delta Y \textrm{ matrix} \\ \textrm{obj. 1 bearing } \delta Y \textrm{ matrix} \\ \textrm{(other sub }\delta Y\textrm{ matrices)}\end{bmatrix}\end{split}\]

The reason for packaging the measurement update matrices in this way is to allow them to be assembled on the fly by the estimator, and combined as needed with other measurement matrices. It is important that the key that is associated with each measurement sub-matrix be unique not only to this substate, but across all possible substate/signal combinations. A combination of the substate name (a unique identifier), the signal ID (also a unique identifier), and the measurement type will ensure this.

Writing a Signal Object

While substates are the objects which manage subcomponents of the state vector, signal objects are the objects that manage the signal model. Any type of signal that you want to use as a measurement of a substate should be modeled as a signal object.

Required Methods

As with substates, there are a few methods which a signal object must have if it is going to function as a signal object. These methods are outlined below.

  • __init__() Initialize the object

  • signalID() Returns the signal’s ID

  • computeAssociationProbability() Computes a non-normalized probability that a given measurement originated from the signal source

    And again, as with substates, there is a base class SignalSource which handles some of the boiler-plate code. In fact, it is possible to initialize a signal source as an instance of the base class SignalSource and use it without any modification. However, in most cases we generally need to make at least some modification. In this case the modifications we need to make are relatively minor, and take place entirely in the initialization method.

Initialization Method

This initialization method is pretty rudimentary. Again, this is in part because the base class SignalSource does some of the work for us. There are two extra bits that we had to take care of.

The first bit of house-keeping is the storing of the state object ID. As currently implemented, each signal source corresponds to a signal associated with a given substate. So, for instance, if you have two objects tracked by a single radar station, you need two signal sources: one to represent a measurement of the first object and one to represent a measurement of the second object. If you had two radar stations, you’d need four signal sources, and so on. Thus, the stateObjectID is the identifier by which the substate associated with this signal source can be located.

Note

It is possible to have a signal source object that is associated with multiple substates. This might be desirable in cases where you have a single object represented by more than one substate. For example, a single spacecraft might be represented by both a position and an attitude substate.

The second house-keeping item is the storing of the “position” attribute. For the type of signal we wish to model, the position from which the measurement is an essential bit of information needed to compute the measurement update. We allow the user to set this position from the beginning.

Of course, any other attributes associated with the signal source could be stored during the initialization function as well.

For this class, this is the only method we need to explicitely define. The rest is taken care of by the base class. However, this may not be the case in all instances. For example, we might want to modify the association probability method to represent measurement noise other than Gaussian. Or, we might want to give the signal source additional methods, such as the ability to generate measurements for simulation purposes. It is up to the user to decide what methods will be beneficial for the particular class.