## @file CorrelationVector
# This package contains the #CorrelationVector class
import numpy as np
#from numpy import sin, cos, arcsin, arccos, arctan2, square, sqrt, abs, power
import matplotlib.pyplot as plt
from . import substate
from .. modularfilter import ModularFilter
from . oneDimensionalPositionVelocity import oneDPositionVelocity
from .. signals.oneDimensionalObject import oneDObjectMeasurement
from .. utils import covarianceContainer
from scipy.linalg import block_diag
from scipy.special import factorial
from math import isnan
## @class CorrelationVector
# @brief CorrelationVector estimates the correlation vector and delay between
# a signal and a time-delayed measurement of that signal
#
# @details
# This class contains an estimator which estimates the correlation vector
# between a signal (the #__trueSignal__) and measurements of that signal. This
# correlation vector is then used to estimate the delay between the
# #__trueSignal__ and the measurements of that signal.
#
# The estimator in this class currently assumes that the signal source is
# "distant," or infinitely far away. This implies that the unit vector to the
# signal source is perfectly known, and not changing. A later implementation
# could include the option of having a non-distant signal source, in which the
# unit vector is changing as a function of position and therefore uncertain.
#
# @note This class is essentially an implementation of the estimator presented in
# <a href="https://doi.org/10.2514/1.G002650">
# Recursive Range Estimation Using Astrophysical Signals of Opportunity</a>,
# J. Runnels, D. Gebre, Journal of Guidance, Control and Dynamics, 2017. Some
# equations from the paper are included in the class documentation for
# reference. A more detailed discussion and derivation of the estimator can
# be found in the journal article..
[docs]class CorrelationVector(substate.SubState):
## @fun #__init__ is responsible for initializing a correlation vector
# estimator
#
# @details The primary function of the #__init__ method is to initialize
# the correlation vector estimator, and store the relevant user inputs. A
# few key user inputs are required in order to initialize the filter.
# Additionally, because the algorithm is relatively complicated, there are
# a number of optional tuning parameters which may be inputed at
# initialization.
#
# In general, the parameters which are required inputs are the ones that
# are critical for initialization of the filter, and should not be changed
# during the course of the filter's lifetime. These inputs are stored as
# "private" variables; indicating that the should not be changed during
# the object's lifetime.
#
# The optional inputs, on the other hand, are inputs which are used in the
# estimation functions (#timeUpdate, #getMeasurementMatrices, etc.).
# These parameters could conceivably be changed during the lifetime of the
# filter without causing problems, and the user may want to change them
# depending on external factors. These parameters are initalized with
# default values, and stored as public variables that the user can in
# theory change.
#
# There are also a set of class variables which are publicly accessable
# and which hold the most recent state estimate. These exist primarily
# for convenience, and are never actually used within the class.
# Modifying them will have no affect on the state estimates. The only way
# to modify a state estimate is through the #storeStateVector method.
#
# The #__init__ method also checks the user inputs to ensure that they are
# consistent with how they will be used in the class (where applicable).
#
# The trueSignal input is checked to see whether it has the following
# methods:
# - flux()
# - signalID()
# - unitVec()
#
# @param trueSignal An object that describes the signal for which
# correlation should be estimated.
# @param filterOrder The number of bins or "taps" in the correlation vector
# @param dT The sampling period, or time-step between bins in the
# correlation vector
#
# @param #t (optional) The initial starting time. If no value is passed,
# initialized to zero by default.
# @param #correlationVector (optional) The initial value of the
# correlation vector. If not supplied, the correlation vector will be
# initialized based on the filter #__dT__ and maximum flux of
# the #__trueSignal__.
# @param #correlationVectorCovariance (optional) The initial value of the
# correlation vector estimate covariance. If not supplied, the covariance
# matrix will be initialized based on the filter #__dT__ and maximum flux
# of #__trueSignal__.
# @param #signalDelay (optional) The initial estimate of delay between
# the #__trueSignal__ and the signal measurements. If not supplied,
# #signalDelay is initialized to zero.
# @param #delayVar (optional) The variance of the estimate of delay
# @param #aPriori (optional) Indicates whether initial estimates are a
# priori or a posteriori. Default=True
#
# @param #centerPeak (optional) Boolean indicating whether the correlation
# vector should be "shifted" after each update to keep the peak centered
# at the zero index. Default is True.
# @param #peakFitPoints (optional) Number of points used on either side of
# max for quadratic fit in #computeSignalDelay. Minimum is 1, default is 1.
# @param #processNoise (optional) Scalar term of additional process noise
# added to covariance matrix in time update. Default is 1e-12
# @param #measurementNoiseScaleFactor (optional) Scale factor to inflate
# the measurement noise. Default is 1.
def __init__(
self,
trueSignal,
filterOrder,
dT,
t=0,
correlationVector=None,
correlationVectorCovariance=None,
signalTDOA=0,
TDOAVar=0,
aPriori=True,
centerPeak=True,
peakFitPoints=1,
processNoise=1e-12,
measurementNoiseScaleFactor=1,
peakLockThreshold=1,
covarianceStorage='covariance',
internalNavFilter=None,
navProcessNoise=1,
tdoaStdDevThreshold=None,
velStdDevThreshold=None,
tdoaNoiseScaleFactor=None,
velocityNoiseScaleFactor=None,
storeLastStateVectors=0,
vInitial=None,
aInitial=None,
gradInitial=None,
peakEstimator='EK'
):
print('updated correlation filter')
self.peakLockThreshold = peakLockThreshold
self.peakCenteringDT = 0
self.peakOffsetFromCenter = 0
self.navProcessNoise = navProcessNoise
"""
This is the default process noise that is injected into the navigation states as noise in the derivative of the highest order state.
"""
## @brief #__unitVecToSignal__ is a unit vector which points to the signal
# source
self.__unitVecToSignal__ = trueSignal.unitVec()
## @brief #__trueSignal__ is a signal object that contains the "true"
# signal for which the correlation vector is being estimated
self.__trueSignal__ = trueSignal
## @brief #__filterOrder__ is the number of "taps" in the estimated
# correlation vector, #correlationVector.
self.__filterOrder__ = filterOrder
## @brief #__dT__ is the "sample period" or "bin size" of the estimated
# correlation vector
self.__dT__ = dT
## @brief #t The current time
self.t = t
## @brief #aPriori Indicates whether the current state vector is the
# result of a time update (#aPriori = True) or a measurement update
# (#aPriori = False)
self.aPriori = aPriori
if correlationVector is None:
correlationVector = (
np.ones(self.__filterOrder__) *
self.__trueSignal__.peakAmplitude * self.__dT__
)
## @brief #correlationVector is the current estimate of the
# correlation vector between the incoming signal measurements and the
# #__trueSignal__
self.correlationVector = correlationVector
if correlationVectorCovariance is None:
if covarianceStorage == 'covariance':
correlationVectorCovariance = (
np.eye(self.__filterOrder__) *
np.square(self.__trueSignal__.peakAmplitude * self.__dT__)
)
elif covarianceStorage == 'cholesky':
correlationVectorCovariance = (
np.eye(self.__filterOrder__) *
self.__trueSignal__.peakAmplitude * self.__dT__
)
# Store the correlation vector covariance in a container
## @brief #correlationVectorCovariance is the covariance matrix of the
# correlation vector estimate, #correlationVector
self.correlationVectorCovariance = correlationVectorCovariance
## @brief #signalDelay is the current estimate of the delay between
# the incoming signal measurements and the #__trueSignal__
self.signalTDOA = signalTDOA
## @brief #delayVar is the variance of the signal delay estimate
# #signalDelay
self.TDOAVar = TDOAVar
## @brief #centerPeak indicates whether the correlation vector is
# shifted to maintain the peak at the middle tap
self.centerPeak = centerPeak
## @brief #peakLock indicates whether the current estimate of
# correlation vector and peak location is accurate enough to "know"
# that we've locked on to the correct peak.
self.peakLock = False
## @brief #peakFitPoints is a variable which controls the number of
# points used for quadratically estimating the location of the
# correlation vector peak
self.peakFitPoints = peakFitPoints
## @brief #processNoise is the scalar value used to generate an
# additional process noise term in #timeUpdate.
self.processNoise = processNoise
## @brief #measurementNoiseScaleFactor is a factor used to scale the
# measurement noise matrix. The default value is 1 (no scaling).
self.measurementNoiseScaleFactor = measurementNoiseScaleFactor
self.peakEstimator = peakEstimator
"""
String that determines which algorithm is used to estimate peak. Use either EK (extended Kalman Filter) or UK (Unscented)
"""
self.__halfLength__ = int(np.ceil(self.__filterOrder__ / 2))
self.__halfLengthSeconds__ = self.__halfLength__ * self.__dT__
xAxis = np.linspace(0, self.__filterOrder__-1, self.__filterOrder__)
self.xAxis = xAxis * self.__dT__
self.__xVec__ = np.linspace(
0,
self.peakFitPoints * 2,
(self.peakFitPoints * 2) + 1
)
self.internalNavFilter = internalNavFilter
print(internalNavFilter)
if self.internalNavFilter == 'none':
self.internalNavFilter = None
self.INF_type = 'none'
elif self.internalNavFilter == 'deep':
self.INF_type = 'deep'
elif self.internalNavFilter:
self.INF_type = 'external'
else:
self.internalNavFilter = None
self.INF_type = 'none'
stateVector = correlationVector
svCovariance = correlationVectorCovariance
if self.INF_type == 'deep':
if not vInitial:
raise ValueError('In order to use the deep internal navigation filter, you must initialize. Filter expects to receive at least vInitial, but received None')
self.velocity = vInitial['value']
self.velocityStdDev = np.sqrt(vInitial['var'])
if not aInitial:
self.navVectorLength = 1
navVector = np.zeros(1)
navVector[0] = vInitial['value']
navVar = np.zeros([1,1])
navVar[0,0] = vInitial['var']
elif not gradInitial:
self.acceleration = aInitial['value']
self.accelerationStdDev = np.sqrt(aInitial['var'])
self.navVectorLength = 2
navVector = np.zeros(2)
navVector[0] = vInitial['value']
navVector[1] = aInitial['value']
navVar = np.zeros([2,2])
navVar[0,0] = vInitial['var']
navVar[1,1] = aInitial['var']
else:
self.acceleration = aInitial['value']
self.accelerationStdDev = np.sqrt(aInitial['var'])
self.gradient = gradInitial['value']
self.gradientStdDev = np.sqrt(gradInitial['var'])
self.navVectorLength = 3
navVector = np.zeros(3)
navVector[0] = vInitial['value']
navVector[1] = aInitial['value']
navVector[2] = gradInitial['value']
navVar = np.zeros([3,3])
navVar[0,0] = vInitial['var']
navVar[1,1] = aInitial['var']
navVar[2,2] = gradInitial['var']
stateVector = np.append(stateVector,navVector)
svCovariance = block_diag(svCovariance, navVar)
svCovariance = covarianceContainer(
svCovariance,
covarianceStorage
)
self.mostRecentF = np.eye(self.__filterOrder__)
self.stateVector = stateVector
super().__init__(
stateDimension=len(stateVector),
stateVectorHistory={
't': t,
'stateVector': stateVector,
'covariance': svCovariance,
'aPriori': aPriori,
'signalTDOA': signalTDOA,
'TDOAVar': TDOAVar,
'xAxis': self.xAxis,
'stateVectorID': -1
},
storeLastStateVectors=storeLastStateVectors
)
self.tdoaStdDevThreshold = tdoaStdDevThreshold
self.velStdDevThreshold = velStdDevThreshold
self.tdoaNoiseScaleFactor = tdoaNoiseScaleFactor
self.velocityNoiseScaleFactor = velocityNoiseScaleFactor
if self.INF_type == 'external':
self.navState = self.internalNavFilter.subStates['oneDPositionVelocity']['stateObject']
return
##
# @name Mandatory SubState Functions
# The following functions are required in order for this class to be used
# as a substate in ModularFilter. The inside of the functions may be
# changed or updated, but their "black box" behavior must remain the
# same; i.e. they must still perform the same essential functions and
# return the same things.
# @{
## @fun #storeStateVector stores an updated estimate of the state vector
[docs] def storeStateVector(
self,
svDict
):
# Unpack updated state vector values
self.t = svDict['t']
self.aPriori = svDict['aPriori']
# Compute new estimate of delay based on new state vector, store in
# svDict and local attributes
if not svDict['aPriori']:
self.correlationVector = svDict['stateVector'][0:self.__filterOrder__]
self.correlationVectorCovariance = svDict['covariance']
self.stateVector = svDict['stateVector']
if self.peakEstimator == 'UK':
tdoaDict = self.estimateSignalTDOA_UT(
self.correlationVector,
self.correlationVectorCovariance
)
elif self.peakEstimator == 'EK':
tdoaDict = self.estimateSignalTDOA_EK(
self.correlationVector,
self.correlationVectorCovariance
)
else:
raise ValueError('Unrecougnized peak finding algorithm %s' %self.peakEstimator)
newTDOA = (
(
tdoaDict['meanTDOA']
) *
self.__dT__
) + self.peakCenteringDT
newTDOAVar = tdoaDict['varTDOA'] * np.square(self.__dT__)
if not isnan(newTDOA) and not isnan(newTDOAVar):
self.signalTDOA = newTDOA
self.TDOAVar = newTDOAVar
svDict['signalTDOA'] = self.signalTDOA
svDict['TDOAVar'] = self.TDOAVar
if self.INF_type == 'external':
if (
(np.sqrt(self.TDOAVar) < (self.tdoaStdDevThreshold))
or (self.tdoaStdDevThreshold == 0)
):
self.internalNavFilter.measurementUpdateEKF(
{'position': {
'value': self.signalTDOA,
'var': self.TDOAVar * self.tdoaNoiseScaleFactor
}},
'oneDPositionVelocity'
)
else:
self.internalNavFilter.measurementUpdateEKF(
{}, ''
)
if self.peakLock is True and self.centerPeak is True:
self.peakOffsetFromCenter = tdoaDict['meanTDOA'] - self.__halfLength__ + 1
# self.peakOffsetFromCenter = np.mod(tdoaDict['meanTDOA'], self.__dT__)
# print(self.peakOffsetFromCenter)
else:
self.peakOffsetFromCenter = 0
else:
# if self.peakLock is True and self.centerPeak is True:
# svDict['stateVector'] = self.correlationVector
# else:
# self.correlationVector = svDict['stateVector']
# if self.peakOffsetFromCenter != 0:
# FLDict = self.buildFLMatrices(
# -self.peakOffsetFromCenter*self.__dT__,
# self.correlationVector
# )
# self.correlationVector = FLDict['F'].dot(self.correlationVector)
# self.peakOffsetFromCenter = 0
# self.correlationVector = svDict['stateVector'][0:self.__filterOrder__]
self.correlationVector = self.mostRecentF.dot(self.correlationVector)
svDict['stateVector'][0:self.__filterOrder__] = self.correlationVector
self.stateVector = svDict['stateVector']
self.correlationVectorCovariance = svDict['covariance']
if self.peakEstimator == 'UK':
tdoaDict = self.estimateSignalTDOA_UT(
self.correlationVector,
self.correlationVectorCovariance
)
elif self.peakEstimator == 'EK':
tdoaDict = self.estimateSignalTDOA_EK(
self.correlationVector,
self.correlationVectorCovariance
)
else:
raise ValueError('Unrecougnized peak finding algorithm %s' %self.peakEstimator)
# newTDOA = (
# (
# tdoaDict['meanTDOA']
# ) *
# self.__dT__
# ) + self.peakCenteringDT
newTDOAVar = tdoaDict['varTDOA'] * np.square(self.__dT__)
# self.signalTDOA = newTDOA
self.TDOAVar = newTDOAVar
svDict['signalTDOA'] = self.signalTDOA
svDict['TDOAVar'] = self.TDOAVar
self.peakOffsetFromCenter = 0
svDict['xAxis'] = self.xAxis + self.peakCenteringDT
# svDict['xAxis'] = self.xAxis - self.signalTDOA
tdoaSTD = np.sqrt(self.TDOAVar)
if tdoaSTD < (self.peakLockThreshold * self.__dT__):
if not self.peakLock:
print(
'Substate %s reached peak lock at time %s'
%(self.__trueSignal__.name, self.t)
)
self.peakLock = True
else:
if self.peakLock and tdoaSTD > (self.peakLockThreshold * self.__dT__ * 1.1):
print(
'Substate %s lost peak lock at time %s'
%(self.__trueSignal__.name, self.t)
)
self.peakLock = False
self.peakOffsetFromCenter = 0
if self.INF_type == 'deep':
fO = self.__filterOrder__
currentV = self.stateVector[fO]
currentVStdDev = np.sqrt(self.correlationVectorCovariance[fO,fO].value)
self.velocity = currentV
self.velocityStdDev = currentVStdDev
svDict['velocity'] = {'value':currentV, 'stddev': currentVStdDev}
if self.navVectorLength == 2:
currentA = self.stateVector[fO+1]
currentAStdDev = np.sqrt(self.correlationVectorCovariance[fO+1,fO+1].value)
svDict['acceleration'] = {'value':currentA, 'stddev': currentAStdDev}
self.acceleration = currentA
self.accelerationStdDev = currentAStdDev
elif self.navVectorLength == 3:
currentA = self.stateVector[fO+1]
currentAStdDev = np.sqrt(self.correlationVectorCovariance[fO+1,fO+1].value)
svDict['acceleration'] = {'value':currentA, 'stddev': currentAStdDev}
self.acceleration = currentA
self.accelerationStdDev = currentAStdDev
currentGrad = self.stateVector[fO+2]
currentGradStdDev = np.sqrt(self.correlationVectorCovariance[fO+2,fO+2].value)
svDict['aGradient'] = {'value':currentGrad, 'stddev': currentGradStdDev}
self.gradient = currentGrad
self.gradientStdDev = currentGradStdDev
elif self.INF_type == 'external':
self.velocity = self.navState.currentVelocity
self.velocityStdDev = np.sqrt(self.navState.velocityVar)
super().storeStateVector(svDict)
return
## @fun #timeUpdate returns the matrices for performing the correlation
# vector time update.
#
# @details This function calls the #buildTimeUpdateMatrices method to
# generate the time-update matrices.
#
# @param self The object pointer
# @param dT The amount of time ellapsed over which the time update is to
# be performed
# @param dynamics A dictionary containing the dynamics for the time update
# (e.g. velocity)
#
# @sa SubStates.SubState.timeUpdate
[docs] def timeUpdate(
self,
dT,
dynamics=None
):
if self.INF_type != 'deep':
timeUpdateMatrices = self.buildTimeUpdateMatrices(
dT, dynamics, self.correlationVector
)
L = timeUpdateMatrices['L']
Q = timeUpdateMatrices['Q']
Qmat = (
np.outer(L, L) * Q +
(
np.eye(self.__filterOrder__) *
self.processNoise * dT *
np.square(self.__trueSignal__.peakAmplitude * self.__dT__)
)
)
if dynamics is not None and 'acceleration' in dynamics:
oneDAcceleration = (
dynamics['acceleration']['value'].dot(self.__unitVecToSignal__) /
self.speedOfLight()
)
oneDAccelerationVar = (
self.__unitVecToSignal__.dot(
dynamics['acceleration']['value'].dot(
self.__unitVecToSignal__.transpose()
)
) / np.square(self.speedOfLight())
)
else:
oneDAcceleration = 0
oneDAccelerationVar = self.navProcessNoise
if self.INF_type == 'external':
self.internalNavFilter.timeUpdateEKF(
dT,
dynamics = {
'oneDPositionVelocityacceleration': {
'value': oneDAcceleration,
'var': oneDAccelerationVar
}
}
)
else:
timeUpdateMatrices = self.buildDeepTimeUpdateMatrices(dT, dynamics, self.correlationVector)
# if dynamics is not None and 'accelerationGrad' in dynamics:
# navProcessNoise = (
# dynamics['accelerationGrad']['value'].dot(self.__unitVecToSignal__) /
# self.speedOfLight()
# )
# oneDAccelerationGradVar = (
# self.__unitVecToSignal__.dot(
# dynamics['accelerationGrad']['value'].dot(
# self.__unitVecToSignal__.transpose()
# )
# ) / np.square(self.speedOfLight())
# )
# else:
L = timeUpdateMatrices['L']
Qmat = (
np.outer(L, L) * self.navProcessNoise + (
(
block_diag(np.eye(self.__filterOrder__),np.zeros([self.navVectorLength,self.navVectorLength])) *
self.processNoise * dT *
np.square(self.__trueSignal__.flux * self.__dT__)
)
)
)
self.mostRecentF = timeUpdateMatrices['F'][0:self.__filterOrder__, 0:self.__filterOrder__]
return {'F': timeUpdateMatrices['F'], 'Q': Qmat}
[docs] def buildDeepTimeUpdateMatrices(self,dT, dynamics, h):
FMatrixShift = -self.peakOffsetFromCenter
filterOrder = self.__filterOrder__
# Initialize empty matricies
F = np.zeros([filterOrder + self.navVectorLength, filterOrder+self.navVectorLength])
halfLength = self.__halfLength__
indexDiff = dT/self.__dT__
peakShift = self.stateVector[self.__filterOrder__] * indexDiff
# Velocity term
self.peakCenteringDT = (
self.peakCenteringDT + self.stateVector[self.__filterOrder__] * dT
)
if self.navVectorLength > 1:
# Acceleration term (if acceleration is being estimated)
self.peakCenteringDT = (
self.peakCenteringDT +
self.stateVector[self.__filterOrder__ + 1] * np.power(dT,2)/2
)
peakShift = (
peakShift + self.stateVector[self.__filterOrder__ + 1]*np.power(indexDiff,2)/2
)
if self.navVectorLength > 2:
# Acceleration gradient term
self.peakCenteringDT = (
self.peakCenteringDT +
self.stateVector[self.__filterOrder__] *
self.stateVector[self.__filterOrder__ + 2] *
np.power(dT,3)/6
)
peakShift = (
peakShift +
self.stateVector[self.__filterOrder__] *
self.stateVector[self.__filterOrder__ + 2] *
np.power(indexDiff,3)/6
)
self.peakCenteringDT = self.peakCenteringDT + (self.peakOffsetFromCenter*self.__dT__)
# Build arrays of indicies from which to form the sinc function
if np.mod(filterOrder, 2) == 0:
baseVec = (
np.linspace(
1 - halfLength,
halfLength,
filterOrder
)
)
else:
baseVec = (
np.linspace(
1 - halfLength,
halfLength - 1,
filterOrder
)
)
# Compute the sinc function of the base vector
sincBase = np.sinc(baseVec + FMatrixShift)
diffBase = np.zeros_like(sincBase)
for i in range(len(baseVec)):
diffBase[i] = self.sincDiff(baseVec[i] + peakShift)
sincBase = np.roll(sincBase, 1 - int(halfLength))
diffBase = np.roll(diffBase, 1 - int(halfLength))
for i in range(len(sincBase)):
currentDiff = np.roll(diffBase, i).dot(h)
F[i, 0:filterOrder] = np.roll(sincBase, i)
F[i, filterOrder] = currentDiff * indexDiff
if self.navVectorLength > 1:
F[i, filterOrder+1] = currentDiff * np.power(indexDiff, 2)/2
if self.navVectorLength > 2:
F[i, filterOrder+2] = (
currentDiff *
self.stateVector[filterOrder] *
np.power(indexDiff, 3)/6
)
L = np.zeros(filterOrder+self.navVectorLength)
if self.navVectorLength == 1:
F[filterOrder, filterOrder] = 1
L[filterOrder] = dT
elif self.navVectorLength == 2:
F[filterOrder, filterOrder] = 1.0
F[filterOrder, filterOrder+1] = dT
F[filterOrder+1, filterOrder+1] = 1.0
L[filterOrder] = np.power(dT, 2)/2
L[filterOrder + 1] = dT
elif self.navVectorLength == 3:
vCurrent = self.stateVector[filterOrder]
aCurrent = self.stateVector[filterOrder + 1]
gradCurrent = self.stateVector[filterOrder + 2]
F[filterOrder,filterOrder] = 1 + (gradCurrent * np.power(dT,2)/2)
F[filterOrder,filterOrder+1] = dT
F[filterOrder,filterOrder+2] = vCurrent * np.power(dT,2)/2
F[filterOrder+1,filterOrder+1] = 1
F[filterOrder+1,filterOrder+2] = vCurrent * dT
F[filterOrder+2,filterOrder+2] = 1
L[filterOrder] = vCurrent * np.power(dT,3)/6
L[filterOrder + 1] = vCurrent * np.power(dT,2)/2
L[filterOrder + 2] = dT
diffBase = np.zeros_like(sincBase)
for i in range(len(baseVec)):
diffBase[i] = self.sincDiff(baseVec[i])
diffBase = np.roll(diffBase, 1 - int(halfLength))
for i in range(len(baseVec)):
L[i] = (
np.roll(diffBase, i).dot(h) *
np.power(indexDiff,self.navVectorLength+1)/factorial(self.navVectorLength+1)
)
# # Setting L to zero for test purposes only
# L = np.zeros(filterOrder+self.navVectorLength)
return({'F':F, 'L':L})
[docs] def getMeasurementMatrices(
self,
measurement,
source=None
):
if (
(source.signalID() == self.__trueSignal__.signalID()) and
('t' in measurement)
):
measurementMatrices = self.getTOAMeasurementMatrices(
measurement,
self.correlationVector
)
HDict = {'correlationVector': measurementMatrices['H']}
RDict = {'correlationVector': measurementMatrices['R']}
dyDict = {'correlationVector': measurementMatrices['dY']}
else:
HDict = {'': None}
RDict = {'': None}
dyDict = {'': None}
measurementMatricesDict = {
'H': HDict,
'R': RDict,
'dY': dyDict
}
return measurementMatricesDict
## @}
## @fun #buildTimeUpdateMatrices constructs the correlation vector time
# update matrices
#
# @details The #buildTimeUpdateMatrices method constructs the matrices required to perform the time update of the correlation vector sub-state.
#
# The time update matrices are a function of the estimated spacecraft velocity (\f$\mathbf{v}\f$), velocity variance (\f$\mathbf{P}_{\mathbf{v}}\f$), and the elapsed time over which the time update occurs (\f$\Delta T\f$). The matrices are constructed as follows:
#
# \f[
# \mathbf{F}_{j \to k} = \begin{bmatrix}
# \textrm{sinc}(\hat{\delta}) & \hdots & \textrm{sinc}(\hat{\delta} + N - 1) \\
# \vdots & \ddots & \vdots \\
# \textrm{sinc}(\hat{\delta} - N + 1) & \hdots & \textrm{sinc}(\hat{\delta})
# \end{bmatrix}
# \f]
#
# \f[
# \mathbf{L}_{j} = \begin{bmatrix}
# \frac{\textrm{cos}}{(\hat{\delta})} - \frac{\textrm{sin}}{(\hat{\delta}^2)} & \hdots \\
# \vdots & \ddots \\
# \end{bmatrix} \sv[timeIndex = k]
# \f]
#
# \f[
# Q_{\delta} = \left(\frac{(k-j)}{c}\right)^2
# {\LOSVec[S]}^T \mathbf{P}_{\mathbf{v}} \LOSVec[S]
# \f]
#
# where
#
# \f[
# \hat{\delta}_{j \to k} = \frac{\mathbf{v} \LOSVec[S] \Delta T}{c T}
# \f]
#
# @param self The object pointer
# @param deltaT The amount of time over which the time update is occuring
# @param dynamics A dictionary containing the relevant dynamics for the
# time update
# @param h The current correlation vector
#
# @returns A dictionary containing the matrices \f$\mathbf{F}\f$,
# \f$\mathbf{L}\f$, and the scalar \f$Q\f
[docs] def buildTimeUpdateMatrices(
self,
deltaT,
dynamics,
h
):
indexDiff = deltaT/self.__dT__
if (
(dynamics is not None and 'velocity' in dynamics) or
(
self.INF_type == 'external' and
(
(np.sqrt(self.navState.velocityVar) < self.velStdDevThreshold) or
self.velStdDevThreshold == 0
)
)
):
if 'velocity' in dynamics:
velocity = dynamics['velocity']['value']
vVar = dynamics['velocity']['var'] * self.velocityNoiseScaleFactor
peakShift = (
(velocity.dot(self.__unitVecToSignal__) * indexDiff) /
self.speedOfLight()
)
# velocityTDOA = peakShift * self.__dT__
velocityTDOA = (
velocity.dot(self.__unitVecToSignal__) * deltaT /
self.speedOfLight()
)
Q = (
self.__unitVecToSignal__.dot(
vVar
).dot(self.__unitVecToSignal__) *
np.square(indexDiff / self.speedOfLight())
)
tdoaQ = (
self.__unitVecToSignal__.dot(vVar
).dot(self.__unitVecToSignal__) *
np.square(deltaT/self.speedOfLight()))
elif self.INF_type=='external':
peakShift = self.navState.currentVelocity * indexDiff
velocityTDOA = self.navState.currentVelocity * deltaT
Q = self.navState.velocityVar * np.square(indexDiff) * self.velocityNoiseScaleFactor
tdoaQ = self.navState.velocityVar * np.square(deltaT) * self.velocityNoiseScaleFactor
else:
velocityTDOA = 0
peakShift = 0
Q = self.navProcessNoise * np.power(indexDiff,4)/4
tdoaQ = self.navProcessNoise * np.power(deltaT,4)/4
FMatrixShift = -self.peakOffsetFromCenter # - peakShift
self.signalTDOA = (
self.signalTDOA +
velocityTDOA
)
self.TDOAVar = self.TDOAVar + tdoaQ
self.peakCenteringDT = (
self.peakCenteringDT + velocityTDOA +
(self.peakOffsetFromCenter*self.__dT__)
)
# Initialize empty matricies
F = np.zeros([self.__filterOrder__, self.__filterOrder__])
L = np.zeros([self.__filterOrder__, self.__filterOrder__])
# Build arrays of indicies from which to form the sinc function
if np.mod(self.__filterOrder__, 2) == 0:
baseVec = (
np.linspace(
1 - self.__halfLength__,
self.__halfLength__,
self.__filterOrder__
)
)
else:
baseVec = (
np.linspace(
1 - self.__halfLength__,
self.__halfLength__ - 1,
self.__filterOrder__
)
)
# Compute the sinc function of the base vector
sincBase = np.sinc(baseVec + FMatrixShift)
diffBase = np.zeros_like(sincBase)
for i in range(len(baseVec)):
diffBase[i] = self.sincDiff(baseVec[i] + peakShift)
sincBase = np.roll(sincBase, 1 - int(self.__halfLength__))
diffBase = np.roll(diffBase, 1 - int(self.__halfLength__))
for i in range(len(F)):
F[i] = np.roll(sincBase, i)
L[i] = np.roll(diffBase, i)
L = L.dot(h)
# else:
# # If no velocity was included in dynamics, then do nothing during
# # time update
# F = np.eye(self.__filterOrder__)
# L = np.zeros(self.__filterOrder__)
# Q = 0
timeUpdateDict = {
'F': F,
'L': L,
'Q': Q
}
return(timeUpdateDict)
[docs] def buildFLMatrices(self, peakShift, h):
# Initialize empty matricies
F = np.zeros([self.__filterOrder__, self.__filterOrder__])
L = np.zeros([self.__filterOrder__, self.__filterOrder__])
# Build arrays of indicies from which to form the sinc function
if np.mod(self.__filterOrder__, 2) == 0:
baseVec = (
np.linspace(
1 - self.__halfLength__,
self.__halfLength__,
self.__filterOrder__
)
)
else:
baseVec = (
np.linspace(
1 - self.__halfLength__,
self.__halfLength__ - 1,
self.__filterOrder__
)
)
# Compute the sinc function of the base vector
sincBase = np.sinc(baseVec + peakShift)
diffBase = np.zeros_like(sincBase)
for i in range(len(baseVec)):
diffBase[i] = self.sincDiff(baseVec[i] + peakShift)
sincBase = np.roll(sincBase, 1 - int(self.__halfLength__))
diffBase = np.roll(diffBase, 1 - int(self.__halfLength__))
for i in range(len(F)):
F[i] = np.roll(sincBase, i)
L[i] = np.roll(diffBase, i)
L = L.dot(h)
return {'F':F, 'L':L}
## @}
## @{
# @name Functions Specific to #CorrelationVector
#
# The following remaining functions are not required in order for this
# class to be used as a SubState, and may be changed as needed,
# including inputs and outputs.
[docs] def getTOAMeasurementMatrices(
self,
measurement,
corrVec
):
photonTOA = measurement['t']['value']
adjustedTOA = photonTOA + self.peakCenteringDT
H = np.eye(self.__filterOrder__)
if self.INF_type == 'deep':
H = np.append(H, np.zeros([self.__filterOrder__, self.navVectorLength]), axis=1)
timeVector = np.linspace(
0,
(self.__filterOrder__ - 1),
self.__filterOrder__
)
timeVector = timeVector * self.__dT__
timeVector = (
timeVector + adjustedTOA
)
# if self.peakLock is True:
# timeVector = timeVector - self.signalDelay
signalTimeHistory = np.zeros(self.__filterOrder__)
halfDT = self.__dT__/2.0
# for timeIndex in range(len(timeVector)):
# signalTimeHistory[timeIndex] = (
# self.__trueSignal__.getSignal(timeVector[timeIndex]) *
# self.__dT__
# )
for timeIndex in range(len(timeVector)):
signalTimeHistory[timeIndex] = (
self.__trueSignal__.signalIntegral(
timeVector[timeIndex]-halfDT,
timeVector[timeIndex] + halfDT
)
)
# plt.plot(signalTimeHistory)
# plt.show(block=False)
# 1/0
# print(corrVec)
# print(signalTimeHistory)
dY = signalTimeHistory - corrVec
R = (
np.eye(self.__filterOrder__) *
#self.__trueSignal__.flux *
self.__trueSignal__.peakAmplitude *
self.__dT__ *
np.dot(corrVec, corrVec) *
self.measurementNoiseScaleFactor
)
measMatDict = {
'H': H,
'dY': dY,
'R': R
}
return measMatDict
## @fun #computeSignalTDOA computes the delay between the #__trueSignal__ and
# measurements based on a correlation vector
#
# @details The #computeSignalDelay function is a rudimentary function
# which takes an estimate of the correlation vector and uses it to
# estimate the location of the peak. It functions by finding the tap with
# the maximum value, and then fitting a quadratic to the points
# surrounding the maximum value tap. The number of points to which the
# quadratic is fitted is determined by the value of #peakFitPoints; the
# number of points is equal to \f$2 * n + 1\f$ where \f$n = \f$
# #peakFitPoints.
#
# The delay estimate that is returned is in units of #__dT__. So, a returned
# value of 2 would imply that the peak is located at 2, and therefore the
# delay corresponding to the correlation vector is 2 #__dT__.
#
# The returned delay may not include previously accumulated #signalDelay
# between the signal and the measurements. See the #storeStateVector
# function for more information on how the #signalDelay is stored and
# accumulated delay is accounted for.
#
# @param self The object pointer
# @param c The correlation vector
# @param P the correlation vector covariance matrix
#
# @return The estimate of the delay
[docs] def computeSignalTDOA(
self,
c,
P
):
# First estimate of peak location is the location of the max value
peakLocation = np.argmax(c)
# Next, we "roll" the correlation vector so that the values being
# fitted quadratically are the first 2 * peakFitPoints + 1 values
lowerBound = peakLocation - self.peakFitPoints
upperBound = lowerBound + (self.peakFitPoints * 2) + 1
if (lowerBound < 0) or (upperBound > self.__filterOrder__):
mySlice = range(lowerBound, upperBound)
slicedC = c.take(mySlice, mode='wrap')
slicedP = P.take(mySlice, axis=0, mode='wrap').take(mySlice, axis=1, mode='wrap')
else:
mySlice = slice(lowerBound, upperBound)
slicedC = c[mySlice]
slicedP = P[mySlice, mySlice]
# xVec is the vector of "x" values corresponding the "y" values to
# which the quadratic is being fit.
xVec = self.__xVec__
# xVec = xVec - rollFactor
xVec = xVec + lowerBound
# Get the quadratic function that fits the peak and surrounding values,
# and use it to estimate the location of the max
# print(slicedC)
if len(xVec) == 3:
TDOA = self.peakFinder(xVec, slicedC)
else:
quadraticVec = self.quadraticFit(xVec, slicedC)
try:
TDOA = (-quadraticVec[1] / (2 * quadraticVec[0]))
except:
TDOA = xVec[peakLocation]
return TDOA
## @fun #estimateSignalTDOA_UT uses a unscented tranform to estimate the
# delay corresponding to a correlation vector
#
# @details The #estimateSignalDelayUT method is responsible for computing
# the estimated value of delay corresponding to a correlation vector, as
# well as the variance of that estimate. These values are computed using
# a unscented transform (i.e. sigma-point) approach.
#
# The method receives the an estimate of the correlation vector, as well
# as the covariance matrix corresponding to that vector. From there it
# computes a set of n sigma points (where n is the length of the
# correlation vector), and for each of the generated sigma point vectors,
# it computes the peak location using the #computeSignalDelay method.
#
# @param self The object pointer
# @param h The correlation vector
# @param P The correlation vector covariance matrix
#
# @returns A dict containing the estimate of the peak location
# ("meanDelay") and the estimate variance ("varDelay")
[docs] def estimateSignalTDOA_UT(
self,
h,
P,
useMean=True
):
# Compute sigma points
hDimension = len(h)
maxHIndex = np.argmax(h)
rollAmount = -maxHIndex + self.__halfLength__
# rollAmount = 1
# hRolled = np.roll(h, rollAmount)
# PRolled = np.roll(np.roll(P.value, rollAmount, axis=0), rollAmount, axis=1)
# Compute the square root of P.
if P.form == 'covariance':
sqrtP = np.linalg.cholesky(
hDimension * P.value[0:self.__filterOrder__, 0:self.__filterOrder__]
)
elif P.form == 'cholesky':
# PVal = P.convertCovariance('covariance').value
# sqrtP = np.linalg.cholesky(hDimension * PVal)
sqrtP = P.value[0:self.__filterOrder__, 0:self.__filterOrder__] * np.sqrt(hDimension)
sigmaPoints = h + np.append(sqrtP, -sqrtP, axis=0)
# Append one more row of sigma points containing the unmodified estimate
sigmaPoints = np.append(np.array([h]), sigmaPoints, axis=0)
# Initiate vector to store the resulting peaks from each sigma point
sigmaPointResults = np.zeros(len(sigmaPoints))
# Compute the peak corresponding to each sigma point vector
for i in range(len(sigmaPoints)):
sigmaPointResults[i] = (
self.computeSignalTDOA(sigmaPoints[i], P.convertCovariance('covariance').value)
)
#meanTDOA = np.mean(sigmaPointResults)
meanTDOA = sigmaPointResults[0]
for i in range(len(sigmaPoints)):
if (meanTDOA - sigmaPointResults[i]) > self.__halfLength__:
sigmaPointResults[i] += self.__dimension__
elif (sigmaPointResults[i] - meanTDOA) > self.__halfLength__:
sigmaPointResults[i] -= self.__dimension__
# meanTDOA = self.computeSignalTDOA(h, P)
varTDOA = np.var(sigmaPointResults)
return {'meanTDOA': meanTDOA, 'varTDOA': varTDOA, 'sigmaPoints': sigmaPointResults}
[docs] def estimateSignalTDOA_EK(self, h, P):
if P.form == 'covariance':
P = P.value[0:self.__filterOrder__, 0:self.__filterOrder__]
elif P.form == 'cholesky':
P = P.convertCovariance('covariance').value[0:self.__filterOrder__, 0:self.__filterOrder__]
# First estimate of peak location is the location of the max value
peakLocation = np.argmax(h)
# Next, we "roll" the correlation vector so that the values being
# fitted quadratically are the first 3 values
lowerBound = peakLocation - 1
upperBound = lowerBound + (1 * 2) + 1
if (lowerBound < 0) or (upperBound > self.__filterOrder__):
mySlice = range(lowerBound, upperBound)
slicedC = h.take(mySlice, mode='wrap')
slicedP = P.take(mySlice, axis=0, mode='wrap').take(mySlice, axis=1, mode='wrap')
else:
mySlice = slice(lowerBound, upperBound)
slicedC = h[mySlice]
slicedP = P[mySlice, mySlice]
# xVec is the vector of "x" values corresponding the "y" values to
# which the quadratic is being fit.
xVec = self.__xVec__
xVec = xVec + lowerBound
# Get the quadratic function that fits the peak and surrounding values,
# and use it to estimate the location of the max
TDOA = self.peakFinder(xVec, slicedC)
jacobian = self.peakFinderJacobian(xVec, slicedC)
variance = jacobian.dot(slicedP).dot(jacobian.transpose())
return {'meanTDOA': TDOA, 'varTDOA': variance}
[docs] def speedOfLight(
self
):
return (299792)
[docs] @staticmethod
def sincDiff(x):
if np.abs(x) < 1e-100:
myDiff = 0.0
else:
piX = np.pi*x
# myDiff = np.pi * (
# (((np.pi * x) * np.cos(x * np.pi)) - np.sin(x * np.pi))
# /
# np.square(x * np.pi)
# )
myDiff = (piX*np.cos(piX) - np.sin(piX))/(np.pi * np.power(x,2))
# myDiff
return myDiff
[docs] @staticmethod
def quadraticFit(x, y):
X_T = np.array([np.power(x, 2), x, np.ones(len(x))])
X = X_T.transpose()
if len(x) < 3:
raise ValueError(
"Cannot fit a quadratic to less than three data points."
)
elif len(x) == 3:
# Note: Suprisingly, it is faster to directly invert the X matrix
# than it is to do a linear solve. Strange.
#coef = np.linalg.solve(X, y)
coef = np.linalg.inv(X).dot(y)
else:
#coef = np.linalg.solve(X_T.dot(X).dot(X_T), y)
coef = np.linalg.inv(X_T.dot(X)).dot(X_T).dot(y)
return coef
[docs] def initializeRealTimePlot(
self,
plotHandle=None,
axisHandle=None
):
super().initializeRealTimePlot(plotHandle, axisHandle)
self.RTPlotTDOA = self.RTPaxisHandle.scatter(
self.signalTDOA,
1
)
self.RTPlotTDOA_error, = self.RTPaxisHandle.plot(
[
self.signalTDOA - np.sqrt(self.TDOAVar),
self.signalTDOA + np.sqrt(self.TDOAVar)
],
[1,1]
)
return
[docs] def realTimePlot(
self,
normalized=True
):
if self.RTPlotHandle is None:
self.initializeRealTimePlot()
self.RTPlotTDOA.set_offsets([self.signalTDOA, 1])
self.RTPlotTDOA_error.set_data(
[
self.signalTDOA - np.sqrt(self.TDOAVar),
self.signalTDOA + np.sqrt(self.TDOAVar)
],
[1,1]
)
super().realTimePlot(normalized, substateRange = slice(0,self.__filterOrder__))
return
[docs] @staticmethod
def peakFinder(x,y):
x1 = x[0]
x2 = x[1]
x3 = x[2]
y1 = y[0]
y2 = y[1]
y3 = y[2]
x0 = (
-(y1*(np.square(x3) - np.square(x2)) + y2*(np.square(x1) - np.square(x3)) + y3*(np.square(x2) - np.square(x1)))
/
(2*(y1*(x2-x3) + y2*(x3-x1) + y3*(x1-x2)))
)
x0 = (
(
y1*(np.square(x2)-np.square(x3)) +
y2*(np.square(x3)-np.square(x1)) +
y3*(np.square(x1)-np.square(x2))
)
/
(2*(y1*(x2-x3) + y2*(x3-x1) + y3*(x1-x2)))
)
return(x0)
[docs] @staticmethod
def peakFinderJacobian(x,y):
x1 = x[0]
x2 = x[1]
x3 = x[2]
y1 = y[0]
y2 = y[1]
y3 = y[2]
A = np.square(x2) - np.square(x3)
B = np.square(x3) - np.square(x1)
C = np.square(x1) - np.square(x2)
D = x2-x3
E = x3-x1
# E = x1-x2
F = x1-x2
AE = A*E
AF = A*F
BD = B*D
BF = B*F
CD = C*D
CE = C*E
denom = 2*np.power(((D*y1) + (E*y2) + (F*y3)),2)
dT_dy1 = (
((AE - BD)*y2 + (AF - CD)*y3)
/
denom
)
dT_dy2 = (
((BD - AE)*y1 + (BF - CE)*y3)
/
denom
)
dT_dy3 = (
((CD - AF)*y1 + (CE - BF)*y2)
/
denom
)
return np.array([dT_dy1, dT_dy2, dT_dy3])