Source code for modest.signals.poissonsource

import numpy as np
from math import isnan
# from scipy.stats import multivariate_normal
from . import signalsource
from abc import ABCMeta, abstractmethod


[docs]class PoissonSource(signalsource.SignalSource): def __init__( self, flux, startTime=0, useTOAprobability=True ): signalsource.SignalSource.__init__(self) self.lastTime = startTime self.useTOAprobability = useTOAprobability self.flux = flux return
[docs] def computeAssociationProbability( self, currentFlux, measurement ): time = measurement['t']['value'] dT = time - self.lastTime if self.useTOAprobability: probability = currentFlux else: probability = 1.0 return probability
[docs]class StaticPoissonSource(PoissonSource): def __init__( self, flux, startTime=0, useTOAprobability=True ): super().__init__( flux, startTime=startTime, useTOAprobability=useTOAprobability )
[docs] def computeAssociationProbability( self, measurement ): poissonProb = super().computeAssociationProbability( self.flux, measurement ) return(poissonProb)
[docs] def generateEvents( self, tMax, t0=0 ): nCandidates = np.int((tMax - t0) * self.flux * 1.1) # print(nCandidates) # Generate a batch of candidate arrival times (more efficient than generating on the fly) arrivalTimeArray = np.random.exponential(1.0/self.flux, nCandidates) poissonEvents = [] tLastEvent = t0 eventIndex = 0 while tLastEvent < tMax: # Draw the next arrival time and selection variable from our # pre-generated arrays if eventIndex < len(arrivalTimeArray): nextEvent = arrivalTimeArray[eventIndex] # If we run out, generate more on the fly else: nextEvent = np.random.exponential(1.0/self.flux) # print('Generating on the fly!') tNextEvent = ( tLastEvent + nextEvent ) if tNextEvent < tMax: poissonEvents.append(tNextEvent) tLastEvent = tNextEvent eventIndex = eventIndex+1 return poissonEvents
[docs]class DynamicPoissonSource(PoissonSource): __metaclass__ = ABCMeta def __init__( self, averageFlux, maxFlux=None, correlationStateName='correlation', startTime=0, useTOAprobability=True ): if maxFlux == None: maxFlux = averageFlux self.maxFlux = maxFlux self.correlationStateName = correlationStateName PoissonSource.__init__( self, averageFlux, startTime=startTime, useTOAprobability=useTOAprobability ) return
[docs] @abstractmethod def getSignal( self, t, tVar=None, state=None ): raise NotImplementedError( "The getSignal method is not implemented in " + "DynamicPoissonSource, and must be overridden." )
[docs] def computeAssociationProbability( self, measurement, stateDict ): state = None if self.correlationStateName in stateDict: state = stateDict[self.correlationStateName]['stateObject'] state = state.getStateVector() #print('Current TDOA std: %.2e' %np.sqrt(state['TDOAVar'])) if 'TDOAVar' in state: if not isnan(state['TDOAVar']): measuredTVar = measurement['t']['var'] + state['TDOAVar'] else: measuredTVar = measurement['t']['var'] # print('State TDOA var is Nan; excluding from TOA probability calculations') else: measuredTVar = measurement['t']['var'] # Hack to try and limit the erroneous locking behavior tVarScaleFactor = 1.0 # print('T var components') # print('Measurement tvar: %s' %measurement['t']['var']) # print('State tvar: %s' %state['TDOAVar']) # print('Current time: %s current tVar: %s' %(measurement['t']['value'], measuredTVar)) currentFlux = self.getSignal( measurement['t']['value'], tVar=measuredTVar, #state=state ) # print('Computed current flux %s' %currentFlux) poissonProb = super().computeAssociationProbability( currentFlux, measurement ) # print('Dynamic poisson TOA probability %s' %poissonProb) return(poissonProb)