Source code for modest.signals.pointsource

import numpy as _np
from scipy.stats import multivariate_normal as _mvn
from . import signalsource
from .. utils import spacegeometry as sg


[docs]class PointSource(signalsource.SignalSource): def __init__( self, RA, DEC, extent=None, attitudeStateName='attitude', useUnitVector=True ): self.useUnitVector = useUnitVector signalsource.SignalSource.__init__(self) self.__RA__ = RA self.__DEC__ = DEC self.__RaDec__ = {'RA': RA, 'DEC': DEC} self.attitudeStateName = attitudeStateName self.lastPDF = None self.extent = extent return
[docs] def RaDec(self): return(self.__RaDec__)
[docs] def unitVec( self, RaDec=None): if RaDec is None: RaDec = self.__RaDec__ cosD = _np.cos(RaDec['DEC']) sinD = _np.sin(RaDec['DEC']) cosRA = _np.cos(RaDec['RA']) sinRA = _np.sin(RaDec['RA']) return _np.array([cosD * cosRA, cosD * sinRA, sinD])
[docs] def computeAssociationProbability( self, measurement, stateDict, validationThreshold=0 ): if ( ('RA' in measurement) and ('DEC' in measurement) and (self.attitudeStateName in stateDict) ): attitudeState = stateDict[self.attitudeStateName]['stateObject'] measurementMatrices = attitudeState.getMeasurementMatrices( measurement, source=self, useUnitVector=self.useUnitVector ) P = attitudeState.covariance() # Convert P from a covariance container to a plain matrix in covariance form P = P.convertCovariance('covariance').value if self.useUnitVector: H = measurementMatrices['H']['unitVector'] R = measurementMatrices['R']['unitVector'] dY = measurementMatrices['dY']['unitVector'] else: H = measurementMatrices['H']['RaDec'] R = measurementMatrices['R']['RaDec'] dY = measurementMatrices['dY']['RaDec'] residualVariance = H.dot(P).dot(H.transpose()) + R # try: uniformProbability = 1/(4 * _np.pi) maxProb = ( 1 / _np.sqrt( _np.power((2*_np.pi), len(dY)) * _np.linalg.det(residualVariance) ) ) # print(measurementMatrices) # print(residualVariance) # maxProb = 1/_np.sqrt(_np.linalg.det(2 * _np.pi * residualVariance)) if maxProb < uniformProbability: # print("using uniform probability") probability = uniformProbability else: # if self.lastPDF: # if self.lastPDF['stateVectorID'] == stateDict['stateVectorID']: # probability = self.lastPDF['dist'].pdf(dY) # else: # self.lastPDF = { # 'stateVectorID': stateDict['stateVectorID'], # 'dist': _mvn(cov=residualVariance expArg = -dY.dot(_np.linalg.inv(residualVariance)).dot(dY)/2 if expArg < -1e1: probability = 0 else: probability = ( maxProb * _np.exp(expArg) ) # Altprobability = _mvn.pdf(dY, cov=residualVariance, allow_singular=True) # if _np.abs(Altprobability - probability) > 1: # raise ValueError( # 'Got different probabilities: ' + # '%s and %s' # %(Altprobability, probability) # ) # print('AOA probability: %s' %probability) # print('max Prob: %s' %maxProb) # print('dY: %s' %dY) # print('var: %s' %residualVariance) # except: # probability = 0 # print('Error computing probability; setting to zero') # print('P:') # print(P) # print('H:') # print(H) # print('R:') # print(R) # print('S:') # print(residualVariance) # print('dY:') # print(dY) # raise ValueError( # 'Error computing probability.' # ) # print('') else: probability=1 return(probability)
[docs] def generateArrivalVector( self, attitudeQ, AOA_StdDev=None ): if hasattr(attitudeQ, '__len__'): measurement = [] for attIndex in range(len(attitudeQ)): measurement.append(self.generateArrivalVector(attitudeQ[attIndex])) else: attitudeMatrix = attitudeQ.rotation_matrix.transpose() unitVecMeas = attitudeMatrix.dot(self.unitVec()) RaMeas, DecMeas = sg.unitVector2RaDec(unitVecMeas) # If we were given a value for angle of arrival standard deviation, # then corrupt the measurements with noise. Otherwise, simply # return the true values. if AOA_StdDev: if _np.isscalar(self.extent): RaMeas = RaMeas + _np.random.normal(scale=(AOA_StdDev + self.extent)) DecMeas = DecMeas + _np.random.normal(scale=(AOA_StdDev + self.extent)) else: RaMeas = RaMeas + _np.random.normal(scale=(AOA_StdDev + _np.sqrt(self.extent[0,0]))) DecMeas = DecMeas + _np.random.normal(scale=(AOA_StdDev + _np.sqrt(self.extent[1,1]))) measurement = { 'unitVec': {'value': sg.sidUnitVec(RaMeas, DecMeas)}, 'RA': { 'value': RaMeas, 'var': _np.square(AOA_StdDev) }, 'DEC': { 'value': DecMeas, 'var': _np.square(AOA_StdDev) } } else: measurement = { 'unitVec': {'value': unitVecMeas}, 'RA': { 'value': RaMeas }, 'DEC': { 'value': DecMeas } } return measurement