Source code for modest.substates.substate

## @module SubState
# This file contains the SubState class.


from abc import ABCMeta, abstractmethod
import numpy as np
# import matplotlib as mp
import matplotlib.pyplot as plt
from .. utils import covarianceContainer

[docs]class SubState(): r""" This is an abstract base class for objects used as sub-states in State.ModularFilter. SubState is an abstract base class that specifies the methods which are required for an object to function as a sub-state of State.ModularFilter. Some of these methods are implemented and most likely do not need to be reimplemented in a derived class implementation (for example the #dimension and #covariance methods. Other methods may have a rudimentary implementation that may be suitable for some derived classes, but not others, depending on the specific functionality of the derived class (for instance #getStateVector and #storeStateVector). Finally, some methods are specifically tagged as abstract methods and are not implemented at all. These methods must be implemented in the derived class. This is usually because there is no way to implement even a rudimentary version of what the method is supposed to do without having some knowledge of what kind of substate the derived class contains (for instance :meth:`timeUpdate` and :meth:`getMeasurementMatrices`). In any case, the documentation for each method of SubState contains a generalized description of what functionality the implementation should provide in a derived class. """ __metaclass__ = ABCMeta nextSubstateObjectID = 0 ## @fun #__init__ initializes a SubState object # # @details The #__init__ method is responsible for initializing a # generalized SubState object. The essential functions of #__init__ are # to store the dimension of the state, and to initialize a time-history of # the state in a SmartPanda object. # # If no values are passed for the initial state estimate dictionary, they # will be initialized to the following default values. # # - 'stateVector': A length #dimension array of zeros # - 'covariance': An (#dimension x #dimension) identity matrix # - 't': 0 # # @param self The object pointer # @param stateDimension The dimension of the sub-state state vector # @param stateVectorHistory A dictionary containing the initial state. def __init__( self, stateDimension=None, stateVectorHistory=None, storeLastStateVectors=0, objectID='' ): if stateDimension is None: stateDimension = len(stateVectorHistory['stateVector']) ## @brief Stores the length of the state vector as seen by # the ModularFilter. See the #dimension function for details on # implementation. self.__dimension__ = stateDimension if stateVectorHistory is None: stateVectorHistory = { 't': 0, 'stateVector': np.zeros(stateDimension), 'covariance': np.eye(stateDimension), 'stateVectorID': 0 } # Check to verify that the dictionary contains the correct keys if 't' not in stateVectorHistory: raise ValueError( "State vector history must contain time key, labeled \"t\"" ) if 'stateVector' not in stateVectorHistory: raise ValueError( "State vector history must contain state vector key, labeled" + "\"stateVector\"" ) if 'stateVectorID' not in stateVectorHistory: raise ValueError( "State vector history must contain state vector id key, labeled \"stateVectorID\"" ) if 'covariance' not in stateVectorHistory: raise ValueError( "State vector history must contain covariance matrix labeled \"covariance\"" ) if len(stateVectorHistory['stateVector']) != self.__dimension__: raise ValueError( "\"stateVector\" in stateVectorHistory must be of same length as state dimension" ) if ( not all([dim==self.__dimension__ for dim in stateVectorHistory['covariance'].shape]) ) or len(stateVectorHistory['covariance'].shape)!=2: raise ValueError( "\"covariance\" must be a %i x %i matrix" %(self.__dimension__, self.__dimension__) ) if not isinstance(stateVectorHistory['covariance'], covarianceContainer): stateVectorHistory['covariance'] = covarianceContainer(stateVectorHistory['covariance'],'covariance') ## @brief Stores the time-history of the sub-state state vector. # self.stateVectorHistory = SmartPanda(data=stateVectorHistory) self.stateVectorHistory = [stateVectorHistory] self.timeList = [stateVectorHistory['t']] ## @brief Stores handle for real-time plotting self.RTPlotHandle = None self.THPlotHandle = None self.RTPlotData = None self.storeLastStateVectors = storeLastStateVectors """ (int) Determines how far back the state vector history may go. If zero, then the entire state vector history is stored """ self.__objectIDSTR__ = objectID self.__objectID__ = SubState.nextSubstateObjectID SubState.nextSubstateObjectID += 1 return ## # @name Mandatory SubState Functions # The following functions are functions which are required for the # SubState to function as a sub-state in State.ModularFilter. # @{
[docs] def getStateVector(self, t=None): r""" getStateVector returns the most recent value of the state vector The getStateVector method is responsible for returning a dictionary object containing, at minimim, the following items: - 'stateVector': A length :attr:`dimension` array containing the most recent state vector estimate - 'covariance': A (:attr:`dimension` x :attr:`dimension`) array containing the most recent covariance matrix - 'aPriori': A boolean indicating if the most recent estimate is the - result of a time update (aPriori=True) or a measurement update (aPriori=False) This function can be used as-is, or can be overloaded to perform additional tasks specific to the substate. Args: t (int): This is an optional argument if the user wants a state vector from a time other than the current time. Returns: dict: A dictionary containing the state vector, covariance matrix, and aPriori status """ # lastSV = self.stateVectorHistory.getDict(-1) if t is None: stateVector = self.stateVectorHistory[-1] else: timeIndex = np.searchsorted(self.timeList, t) stateVector = self.stateVectorHistory[timeIndex] return(stateVector)
[docs] def storeStateVector(self, svDict): r""" storeStateVector stores the most recent value of the state vector. The storeStateVector method is responsible for storing a dictionary containing the most recent state estimate. In SubState implementation, the functionality is minimal: the new dictionary is simply appeneded to the list of state vector estimates. However, in some derived classes, it may be nescessary to implement additional functionality. This is particularly true if there are derived quantities that need to be calculated from the updated state vector (for instance, calculating the attitude quaternion from the attitude error states). Also in some cases, the actual value of the state vector may need to be "tweaked" by the SubState derived class. If an alternative implementation is written for a derived class, it should still call this implementation, or at least make sure that it stores the current state estimate in :attr:`stateVectorHistory`. Args: svDict (dict): A dictionary containing the current state estimate. """ self.stateVectorHistory.append(svDict) # Check to see if state vector history is too long; if so truncate to # the last storeLastStateVectors values if self.storeLastStateVectors > 0: if len(self.stateVectorHistory) > self.storeLastStateVectors: self.stateVectorHistory = self.stateVectorHistory[ len(self.stateVectorHistory)-self.storeLastStateVectors: ] self.timeList.append(svDict['t']) return
[docs] def covariance(self): r""" :meth:`covariance` returns the SubState covariance matrix The covariance method returns the covariance of the estimate of the substate. todo: Currently, this method only returns the covariance of the most recent state estimate. Ideally, there should be an optional time parameter which would allow the user to get the covaraince matrix at a specified time (or the closest to that specified time). Returns: covarianceContainer: Returns the covaraince matrix """ # return self.stateVectorHistory.getDict(-1)['covariance'] return self.stateVectorHistory[-1]['covariance']
[docs] def dimension( self ): r""" dimension returns the dimension of the sub-state vector The dimension method returns the dimension of the sub-state vector estimated by the SubState. This is the dimension as seen by the ModularFilter estimator. The default implementation is to return the class variable :attr:`__dimension__`, which is saved at initialization. This is designated as a "protected" variable, and should not change during the course of the :class:`SubState`'s lifetime. If child class overwrites this implementation, care should be taken to ensure that the value returned by #dimension does not change over SubState object lifetime. For SubState objects with auxilary states, or other quantities related to the state vector but not directly estimated by the ModularFilter, #dimension should not count these states as part of the total dimension. Returns: int: The dimension of state vector """ return(self.__dimension__)
[docs] @abstractmethod def timeUpdate(self, dT, dynamics=None): r""" timeUpdate returns time-update matrices The :meth:`timeUpdate` method is responsible for returning the EKF time update matrices. Specifically, it returns the state update matrix :math:`\mathbf{F}` and the process noise matrix :math:`\mathbf{Q}`, following the standard `Extended Kalman Filter <https://en.wikipedia.org/wiki/Extended_Kalman_filter>`_ time update equations: .. math:: \mathbf{x}_k^- = \mathbf{F}\mathbf{x}_j^+ .. math:: \mathbf{P}_k^- = \mathbf{F} \mathbf{P}_k^- \mathbf{F}^T + \mathbf{Q} Because these matrices are nescessarily specific to the type of substate being updated, there is no default implementation in the SubState class. Rather, each derived class must implement this method as appropriate for the dynamics of the state being modeled. In addition, some substates may require additional operations to occur at a time update. For instance, if a substate includes auxillary values (for instance, the attitude quaternion derived from the attitude error state), it may need to be time-updated seperately from the other states. In this case, the local implementation of the #timeUpdate function is the place to do these updates. Args: dT (float): The ellapsed time over which the time update occurs dynamics (dict): A dictionary containing any dynamics infomation which may be needed to update the state, for instance, measured accelerations or angular velocities. Returns: (dict) A dictionary containing, at minimum, the following items: - "F": The state time-update matrix - "Q": The process noise matrix """ pass
[docs] @abstractmethod def getMeasurementMatrices(self, measurement, source=None): r""" getMeasurementMatrices returns matrices needed to perform a measurement update The :meth:`getMeasurementMatrices` method is responsible for returning the EKF measurement update matrices. Specifically, it returns the measurement matrix :math:`\mathbf{H}`, the measurement noise matrix :math:`\mathbf{R}`, and the measurement residual vector :math:`\mathbf{\delta}\mathbf{y}`, folloing the standard `Extended Kalman Filter <https://en.wikipedia.org/wiki/Extended_Kalman_filter>`_ measurement update equations: .. math:: \mathbf{\delta y} = \mathbf{y} - h(\mathbf{x}_k^-, \mathbf{w}_k) .. math:: \mathbf{H}_k^-= \frac{\partial h}{ \partial \mathbf{x}} .. math:: \mathbf{S}_k^- = \mathbf{H}_k \mathbf{P}_k^- \mathbf{H}_k^T + \mathbf{R} .. math:: \mathbf{K}_k^- = \mathbf{P}_k^- \mathbf{H}_k^T \mathbf{S}_k^{-1} Because these matrices are nescessarily specific to the type of substate being updated, there is no default implementation in the SubState class. Rather, each derived class must implement this method as appropriate for the measurement of the state being modeled. Additionally, the measurement update may be different depending on which signal source is generating the measurement, so the substate must have measurement matrix generation implemented for all the expected signal sources. In addition, some substates may require additional operations to occur at a measurement update. For instance, if a substate includes auxillary values (for instance, the attitude quaternion derived from the attitude error state), it may need to be updated seperately after the global state vector has been updated. In this case, the local implementation of the :meth:`timeUpdate` function is the place to do these updates. Note that there is no time associated with the measurement; the filter assumes that the measurement is occuring at the current time. Therefore it is the user's responsibility to time-update the state to the current time before doing the measurement update. Args: measurement (dict): A dictionary containing the measurement value(s) source (str): A key uniquely identifying the source of origin of the measurement Returns: (dict) A dictionary containing, at minimum, the following items: - "F": The state time-update matrix - "Q": The process noise matrix """ pass
""" Plotting Functions """
[docs] def initializeRealTimePlot( self, plotHandle=None, axisHandle=None ): if plotHandle is None: self.RTPlotHandle = plt.figure() else: self.RTPlotHandle = plotHandle if axisHandle is None: self.RTPaxisHandle = plt.gca() else: self.RTPaxisHandle = axisHandle xAxis = np.linspace(0, self.__dimension__ - 1, self.__dimension__) self.RTPlotData, = plt.plot( xAxis, np.zeros(self.__dimension__) ) # plt.grid() plt.show(block=False) return
[docs] def realTimePlot( self, normalized=True, substateRange=None ): if self.RTPlotHandle is None: self.initializeRealTimePlot() stateDict = self.getStateVector() yAxis = stateDict['stateVector'] if substateRange: yAxis = yAxis[substateRange] if normalized is True: self.RTPaxisHandle.set_ylim([0, 1.1]) yAxis = yAxis - np.min(yAxis) try: yAxis = yAxis/np.max(yAxis) except: pass if 'xAxis' in stateDict: xAxis = stateDict['xAxis'] self.RTPaxisHandle.set_xlim([np.min(xAxis), np.max(xAxis)]) else: xAxis = np.linspace(0, self.__dimension__ - 1, self.__dimension__) self.RTPlotData.set_data(xAxis, yAxis) self.RTPlotHandle.canvas.draw() self.RTPlotHandle.canvas.flush_events() return
[docs] def timeHistoryPlot( self ): if self.THPlotHandle is None: self.initializeTimeHistoryPlot() for stateCounter in range(self.dimension()): self.THPlotDataList[stateCounter]['x'].append(self.stateVectorHistory[-1]['t']) self.THPlotDataList[stateCounter]['y'].append( self.stateVectorHistory[-1]['stateVector'][stateCounter] ) self.THPlotObjectList[stateCounter].set_data( self.THPlotDataList[stateCounter]['x'], self.THPlotDataList[stateCounter]['y'] ) self.THPlotHandle.canvas.draw() self.THPlotHandle.canvas.flush_events() return
[docs] def initializeTimeHistoryPlot( self, plotHandle=None ): if plotHandle is None: self.THPlotHandle = plt.figure() else: self.THPlotHandle = plotHandle # if axisHandle is None: # self.THPaxisHandle = plt.gca() # else: # self.THPaxisHandle = axisHandle self.THPaxisList = [] self.THPlotDataList = [] self.THPlotObjectList = [] self.THSigmaPlotObjectList = [] for stateCounter in range(self.dimension()): newAxis = plt.subplot2grid( (self.dimension(), 1), (stateCounter, 0) ) self.THPaxisList.append(newAxis) newAxisX = [svh['t'] for svh in self.stateVectorHistory] newAxisY = [svh['stateVector'][stateCounter] for svh in self.stateVectorHistory] newOneSigma = [np.sqrt(svh['covariance'].convertCovariance('covariance')[stateCounter][stateCounter].value) for svh in self.stateVectorHistory] newPlot, = plt.plot(newAxisX, newAxisY) # newSigmaPlotTop, = plt.plot(newAxisX, newOneSigma, ls='dotted',color='grey') # newSigmaPlotBottom, = plt.plot(newAxisX, -np.array(newOneSigma), ls='dotted',color='grey') # newOneSigmaPlot = (newSigmaPlotTop, newSigmaPlotBottom) self.THPlotObjectList.append(newPlot) # self.THSigmaPlotObjectList.append(newOneSigmaPlot) self.THPlotDataList.append( { 'x': newAxisX, 'y': newAxisY, # 'sigma': newOneSigma } ) plt.tight_layout() plt.show(block=False) return
[docs] def objectID(self): return self.__objectID__
def __repr__(self): return 'SubState(' + self.__objectIDSTR__ + 'substate ID: %s)' %self.__objectID__