Source code for modest.signals.periodicxraysource


## @file periodicxraysource.py
# @brief This file contains the PeriodicXRaySource class

import numpy as np
from math import factorial, isnan
import matplotlib.pyplot as plt

from . import pointsource
from . import poissonsource
from .. utils import spacegeometry as sg


[docs]class PeriodicXRaySource( poissonsource.DynamicPoissonSource, pointsource.PointSource ): """ PeriodicPoissonSource is a class which models the signal from a periodic poisson source, e.g. a pulsar. This class accepts a large number of initialization inputs required to model such a source. Alternatively, the user can pass a PAR file, and the object will initialize based on that. """ def __init__( self, profile=None, avgPhotonFlux=None, pulsedFraction=None, PARFile=None, movePeakToZero=True, normalizeProfile=True, phaseDerivatives=None, pulsarPeriod=None, RA=None, DEC=None, observatoryMJDREF=None, TZRMJD=None, PEPOCH=None, name=None, attitudeStateName='attitude', correlationStateName=None, useProfileColumn=None, startTime=0, extent=None, useTOAprobability=True, useFitted=True, detectorArea=1 ): # Store the user-passed arguments first. These take priority of # parameters received in the PAR file. self.useFitted = useFitted """ Some PAR files (specifically ones generated by us) contain two frequencies; a "fitted" frequency (computed by fitting a quadratic to the histogram max function) and a non fitted (computed by just taking the max of the histogram function). This flag specifies which one to use if both are present. """ self.detectorArea = detectorArea """ Allows user to specify area of detector by which flux is multiplied. Must be in compatible units as flux (i.e. if flux is in photons / cm^2 / s then detectorArea must be in cm^2). Default is 1. """ self.extent=None """ Specifies the spatial extent of the pulsar """ self.phaseDerivatives = phaseDerivatives """ Stores the derivatives of phase with respect to time phaseDerivatives is a dict which contains the derivatives of phase with respect to time. The 0th derivative of phase is simply a constant phase offset (zero by default). The 1st derivative of phase is frequency. Higher order derivatives reflect the change in frequency with respect to time. The dict keys should be integers which indicate the order of the derivative. e.g. >>> phaseDerivatives = {0: 0, 1: 29.7417, 2: -3.7184e-19, 3: 1.1949e-20} (Phase derivatives shown for Crab Nebula pulsar) """ self.fittedPhaseDerivatives = None """ Alternative phase derivatives, derived by fitting rather than taking max. """ self.TZRMJD = TZRMJD """ TZRMJD stores the Modified Julian Date (MJD) corresponding to t=0 seconds (for the purpose of computing the pulsar phase) """ self.PEPOCH = PEPOCH self.observatoryMJDREF = observatoryMJDREF """ observatoryMJDREF stores the MJD corresponding to t=0 seconds for photon observations """ self.name = name """ name is an arbitrary string that is used to identify the signal. Can be set to whatever the user wants, since it is only used for display purposes. Usually set to the astrophysical signal's name (i.e. "Crab Pulsar" or "PSR B0531+21") """ self.avgPhotonFlux = avgPhotonFlux """ averagePhotonFlux is the mean value of photons expected to # arrive from the signal per second (units of photons/second) """ self.pulsedFraction = pulsedFraction """ pulsedFraction is a value from 0 to 1 which indicates what percentage of the avgPhotonFlux is "pulsed" and what percentage is constant/background """ self.covRA = None """ Spatial covariance of source in right-ascension """ self.covDEC = None """ Spatial covariance of source in declination """ self.covX = None """ Cross-correlation of spatial covariance terms """ self.fluxERG = None """ Energy flux in ergs per unit area """ self.fluxKEV=None """ Energy flux in KEV per unit area """ self.profile=None """ profile is a numpy array containing the numerical value of flux over a single period of the signal. The profile array contains the signal profile of the pulsar (or periodic source) being modeled. If the user selected to normalize the profle, then the profile will be normalized from zero to one, and then scaled based on the average flux value. If the profile is not normalized, then the raw values will be used for computing the signal. If the profile is normalized but no average flux value is received, a warning will be issued. """ # Process the PAR file, if received. Give priority to parameters # passed directly as init arguments. if PARFile is not None: self.PARFile = PARFile PAR_RA, PAR_Dec = self.processPARFile(PARFile) if (RA is None): RA = PAR_RA if (DEC is None): DEC = PAR_Dec # Check to see if fitted phase derivatives were stored. IF not, don't try to use them. if self.useFitted: if self.fittedPhaseDerivatives == None: self.useFitted = False if np.all([self.covRA, self.covDEC, self.covX]): self.extent = np.array([ [self.covRA, self.covX], [self.covX, self.covDEC] ]) # Initialize PointSource with Right ascension and declination values pointsource.PointSource.__init__( self, RA, DEC, attitudeStateName=attitudeStateName, extent=self.extent ) # Check to make sure that we received either a phaseDerivatives dict # or a value for pulsar period if ( (self.phaseDerivatives is None) and (self.fittedPhaseDerivatives is None) and (pulsarPeriod is None) ): raise ValueError( "Not enough timing information was received to initialize a " + "pulsar signal. One of the following inputs is required: \n" + "- pulsarPeriod \n" + "- phaseDerivatives" + "- PARFile containing frequency information" ) self.normalizeProfile = normalizeProfile """ normalizeProfile is a boolean flag used to indicate whether the profile is normalized from zero to one. If so, then the signal should be multiplied by #peakAmplitude. """ # Process whatever was passed as the profile if profile is not None: self.processProfile(profile, normalizeProfile, movePeakToZero, useProfileColumn) self.TZeroDiff = 0 """ TZeroDiff is the time difference between the pulsar's t zero (i.e. PEPOCH) and the observatory's tZero. This amount is subtracted from the observatory time in calculations of flux, period, etc. to sync observatory time with the pulsar parameter's reference time """ if (self.PEPOCH is not None) and (self.observatoryMJDREF is not None): self.TZeroDiff = (self.PEPOCH - self.observatoryMJDREF)*24*60*60 else: self.TZeroDiff = 0 if correlationStateName is None: correlationStateName = self.name # Update the pulsar period and time array. self.pulsarPeriod = 0 """ pulsarPeriod is the amouont of time (in seconds) for one complete pulsar pulse. """ if self.useFitted: self.pulsarPeriod = 1/self.fittedPhaseDerivatives[1] else: self.pulsarPeriod = 1/self.phaseDerivatives[1] self.computeSinglePeriodIntegral() poissonsource.DynamicPoissonSource.__init__( self, self.avgPhotonFlux*detectorArea, maxFlux=self.peakAmplitude, correlationStateName=correlationStateName, startTime=startTime, useTOAprobability=useTOAprobability ) return
[docs] def processProfile( self, profile, normalizeProfile=True, movePeakToZero=True, useProfileColumn=None ): r""" processProfile reads in a series of data points representing the pulsar's pulse profile, and processes it for use in generating signals. Args: profle: Either a string pointing to a file, or a list of values representing the profile normalizeProfile (bool): Determines whether to normalize the profile to be between zero and one. movePeakToZero (bool): Determines whether the max value of the profile will be shifted to zero. This is fairly standard convention. useProfileColumn (int): Optionally, determine which column of the profile file is contains the profile information. Different profile files have various numbers of columns. By default, method will use the last column. """ # If a string is received, assume this points to a file and try to # open it and import the data. if type(profile) is str: profileArray = np.loadtxt(profile) if useProfileColumn is None: profileArray = profileArray[:, len(profileArray[0]) - 1] else: profileArray = profileArray[:, int(useProfileColumn)] profile = profileArray if normalizeProfile is True: if (self.avgPhotonFlux is None) or (self.pulsedFraction is None): raise Warning( "You are normalizing the profile from zero to one, but " + "you haven't given values for average photon flux or " + "pulsed fraction. This will result in a signal which " + "is most likely not scaled properly." ) profile = profile - np.min(profile) profile = profile/np.max(profile) if movePeakToZero is True: if np.argmax(profile) != 0: profile = np.roll(profile, -np.argmax(profile)) profile = np.append(profile, profile[0]) self.profile = profile self.profileIndex = np.linspace(0, 1, len(self.profile)) return
[docs] def processPARFile( self, PARFile, replaceCurrentValues=False ): r""" Read a PAR file passed by the user and change/initialize object attributes accordingly. `PAR files <http://www.jb.man.ac.uk/~pulsar/Resources/tempo_usage.txt>`_ are a semi-standardized way to store information about pulsars. Most PAR files contain, at a minimum, the name of the pulsar, the sky coordinates of the pulsar (i.e. right ascension and declination) and frequency information about the pulsar. They may also contain other helpful information such as reported glitches, pulsar binary model information, etc. Rather than force the user to enter all this information manually, the object can initialize from a PAR file and gather the relevant information on its own. In order to further simplify the pulsar object initialization pipeline, this method also processes additional information found in the PAR file which isn't part of the standard PAR file format. This includes information like source extent (i.e. how big is the object in the sky), source flux (photons per second per unit area, energy per second per unit area), pulsed fraction, and pulse profile. This information is not normally included in a PAR file. However, it is relatively easy to add it manually, and this simplifies workflow by keeping all the relevant information about the pulsar in one file. Args: PARFile (str): Path to PAR file to be read replaceCurrentValues (bool): Determines whether previously stored values will be overwritten by values found in the PAR file Returns: (float, float): Tuple containing the right ascension and declination of the source found in the PAR file """ # Read PAR file, and split into lines parTextFile = open(PARFile, "r") lines = parTextFile.read().split('\n') PARPhaseDerivatives = {0: 0} PARFittedPhaseDerivatives = None newProfile = [] for line in lines: # Split the line into a list of strings, and strip the # whitespace splitLine = line.strip().split() # If the line contains relevant information, store it. if len(splitLine) > 0: # FREQUENCY # If line contains frequency information, the first string # should be of the format "F<i>" where i is an integer. if ( (splitLine[0][0] == 'F') and (len(splitLine[0]) == 2) ): # Extract the order of derivative of the phase. # e.g. F0 is the 1st derivative of phase, F1 is the # 2nd derivative of phase, etc. freqOrder = int(splitLine[0][1]) + 1 PARPhaseDerivatives[freqOrder] = float(splitLine[1]) # FITTED FREQUENCY # If line contains fitted frequency information, the first string # should be of the format "F<i>_f" where i is an integer. if ( (splitLine[0][0] == 'F') and (splitLine[0][-2:] == '_f') ): if PARFittedPhaseDerivatives == None: PARFittedPhaseDerivatives = {0: 0} # Extract the order of derivative of the phase. # e.g. F0 is the 1st derivative of phase, F1 is the # 2nd derivative of phase, etc. freqOrder = int(splitLine[0][1]) + 1 PARFittedPhaseDerivatives[freqOrder] = float(splitLine[1]) # RIGHT ASCENSION elif ((splitLine[0] == 'RAJ') or (splitLine[0] == 'RA')): # PAR files store right ascension as HH:MM:SS, so split # on the ":" character hmsArray = splitLine[1].split(':') # print(hmsArray) PAR_RA = ( sg.hms2rad( float(hmsArray[0]), float(hmsArray[1]), float(hmsArray[2]) ) ) # DECLINATION elif ((splitLine[0] == 'DECJ') or (splitLine[0] == 'DEC')): # Split on ":" and convert to radians dmsArray = splitLine[1].split(':') PAR_Dec = ( sg.dms2rad( float(dmsArray[0]), float(dmsArray[1]), float(dmsArray[2]) ) ) # T-zero Mod Julian Date elif (splitLine[0] == 'TZRMJD'): PAR_TZRMJD = float(splitLine[1]) if (self.TZRMJD is None) or (replaceCurrentValues is True): self.TZRMJD = PAR_TZRMJD # PEPOCH Mod Julian Date elif (splitLine[0] == 'PEPOCH'): PAR_PEPOCH = float(splitLine[1]) if (self.PEPOCH is None) or (replaceCurrentValues is True): self.PEPOCH = PAR_PEPOCH # Spatial covariance matrix, right ascension elif (splitLine[0] == 'COV_RA'): covRA = float(splitLine[1]) if (not self.covRA) or (replaceCurrentValues): self.covRA = covRA # Spatial covariance matrix, declination elif (splitLine[0] == 'COV_DEC'): covDEC = float(splitLine[1]) if (not self.covDEC) or (replaceCurrentValues): self.covDEC = covDEC # Spatial covariance matrix, cross correlation elif (splitLine[0] == 'COV_X'): covX = float(splitLine[1]) if (not self.covX) or (replaceCurrentValues): self.covX = covX # Pulsar Name elif ((splitLine[0] == 'PSRJ') or (splitLine[0] == 'PSR')): if (self.name is None) or (replaceCurrentValues is True): self.name = splitLine[1] # Energy flux in ergs per unit area elif (splitLine[0] == 'FLX_ERG'): if (self.fluxERG is None) or (replaceCurrentValues is True): self.fluxERG = float(splitLine[1]) # Energy flux in KEV per unit area elif (splitLine[0] == 'FLX_KEV'): if (self.fluxKEV is None) or (replaceCurrentValues is True): self.fluxKEV = float(splitLine[1]) # Photon flux per unit area elif (splitLine[0] == 'FLX_PH'): if (self.avgPhotonFlux is None) or (replaceCurrentValues is True): self.avgPhotonFlux = float(splitLine[1]) # Fraction of photon flux that is pulsed elif (splitLine[0] == 'P_FRAC'): if (self.pulsedFraction is None) or (replaceCurrentValues is True): self.pulsedFraction = float(splitLine[1]) # Profile (assumed to be in order elif (splitLine[0] == 'PROFILE'): newProfile.append(float(splitLine[1])) if ( (self.phaseDerivatives is None) or (replaceCurrentValues is True) ): self.phaseDerivatives = PARPhaseDerivatives if ( (self.fittedPhaseDerivatives is None) or (replaceCurrentValues is True) ): self.fittedPhaseDerivatives = PARFittedPhaseDerivatives if len(newProfile) > 0: # plt.figure() # plt.plot(newProfile) # plt.show(block=False) # print(self.profile) if replaceCurrentValues or (not np.any(self.profile)): self.processProfile(newProfile) return(PAR_RA, PAR_Dec)
[docs] def computeSinglePeriodIntegral( self ): r""" Compute the integral as a function of time of the pulsar flux. This will be used later to compute expected value of flux in the case where time is uncertain. Generally this function only needs to be called once at initialization; once the integral is computed it is stored as an attribute and can be accessed on demand. Integral is computed using trapezoidal integration. """ self.singlePeriodIntegral = np.zeros(len(self.profile)) """ An array containing the numerically integrated flux over a single period """ singlePeriodTimeArray = np.linspace( 0, self.pulsarPeriod, len(self.profile) ) self.profileLen = len(self.profile) for i in range(len(self.profile)): self.singlePeriodIntegral[i] = np.trapz( self.profile[0:i + 1], singlePeriodTimeArray[0:i + 1], axis=0 ) # Store the total flux integral over one period fluxIntegral = self.singlePeriodIntegral[-1] # Compute the scaling factor based on the average photon flux (if the # average flux was given) if self.avgPhotonFlux is not None: self.photonsPerPeriod = ( self.avgPhotonFlux * self.pulsarPeriod * self.detectorArea ) """ Expected number of photons for a given pulsar period """ self.peakAmplitude = self.photonsPerPeriod / fluxIntegral """ Maximum amplitude of the pulsar signal. Scaled so that the average flux is equal to the value given in initialization """ # If some of the flux is unpulsed, scale the peak amplitude # accordingly and compute a background rate to account for the # unpulsed portion if self.pulsedFraction is not None: self.scaleFactor = self.peakAmplitude * self.pulsedFraction """ Amount by which the signal is to be multiplied to get the correct flux values """ self.backgroundCountRate = ( self.avgPhotonFlux * self.detectorArea * (1 - self.pulsedFraction) ) """ Flux of the unpulsed portion of the signal. Not to be confused with other background sources. """ else: self.backgroundCountRate = 0 self.scaleFactor = self.peakAmplitude else: self.peakAmplitude = np.max(self.profile) self.scaleFactor = 1.0 self.backgroundCountRate = 0 return
[docs] def getPhase( self, observatoryTime ): r""" Compute the current phase of the pulsar signal. This method uses all available frequency/phase information including derivatives. However, it does not check for validity of the time passed. Some PAR files are valid over only a specific region of time, and this function will not verify that the time given is within that time window. It is up to the user to verify the validity of the results. Args: observatoryTime (float): Local time at which flux is to be computed. Returns: (float): Current pulsar phase """ phase = 0 shiftedObservatoryTime = observatoryTime - self.TZeroDiff if self.useFitted: myPhaseDerivatives = self.fittedPhaseDerivatives else: myPhaseDerivatives = self.phaseDerivatives for order in myPhaseDerivatives: phase = ( phase + ( myPhaseDerivatives[order] * np.power(shiftedObservatoryTime, order) / factorial(order) ) ) return(phase)
[docs] def getFrequency( self, observatoryTime ): r""" Compute the current frequency of the pulsar signal This function uses all available frequency information including derivatives to compute the current frequency of the pulsar signal at the given time. However it does not check for time validity. Some PAR files are valid over only a specified time frame; it is up to the user to verify whether the time given is within that region. See Also: :meth:`getPeriod` Args: observatoryTime (float): Time for which frequency is to be computed Returns: (float): Computed frequency in Hz """ frequency = 0 shiftedObservatoryTime = observatoryTime - self.TZeroDiff if self.useFitted: myPhaseDerivatives = self.fittedPhaseDerivatives else: myPhaseDerivatives = self.phaseDerivatives for order in myPhaseDerivatives: if order > 0: frequency = ( frequency + ( myPhaseDerivatives[order] * np.power(shiftedObservatoryTime, order-1)/ factorial(order - 1) ) ) return frequency
[docs] def getPeriod( self, observatoryTime ): r""" Compute the current period of the pulsar signal This function uses all available frequency information including derivatives to compute the current period of the pulsar signal at the given time. However it does not check for time validity. Some PAR files are valid over only a specified time frame; it is up to the user to verify whether the time given is within that region. See Also: :meth:`getFrequency` Args: observatoryTime (float): Time for which period is to be computed Returns: (float): Computed period in seconds """ return 1/self.getFrequency(observatoryTime)
[docs] def getSignalMJD( self, MJD ): r""" getSignalMJD is a wrapper function that returns the photon flux at a given Modified Julian Date See Also: :meth:`getSignal` Args: MJD (float): The Modified Jullian Date for which flux is to be returned Returns: (float): The signal at the requested date """ observatoryTime = self.MJD2seconds(MJD) return (self.getSignal(observatoryTime))
[docs] def getSignal( self, observatoryTime, tVar=None, #state=None ): r""" getSignal is responsible for returning the photon flux from the pulsar at a given time The getSignal method is the method which is responsible for returning the current value of flux from the signal source at a given time. If there is uncertainty in the time, then the expected value of the signal is returned. Uncertainty in time is indicated by the optional tVar argument. In every case, the method calls the getPhase method to determine the current signal source phase. If no uncertainty in time is passed, then the getPulseFromPhase method is called to lookup/interpolate the current flux based on the phase and the signal :attr:`profile`. If a value is passed for tVar, then the process is more complicated. Rather than simply look up the signal value from the phase, the function returns an approximation of the expected value of the signal, given the mean and variance of time. Normally, the expected value would be computed by integrating the product of the true distribution of time (probably a normal distribution) with the flux as a function of time. However, due to the non-analytical nature of the flux function, the direct computation of this integral is intractable. To overcome this limitation, the time distributuion is approximated as a moment-matched uniform distribution. Using this approximation, the approximate integral may be directly computed simply by looking up the start and end values of :attr:`singlePeriodIntegral`, and dividing by the appropriate scaling factor. Args: observatoryTime (float): The time for which to compute the signal tVar (float): The variance of the time estimate (optional) Returns: (float): The signal at the requested time, in photons/second """ # if state is not None: # if 'signalDelay' in state: # delay = state['signalDelay'] # if 'delayVar' in state: # delayVar = state['delayVar'] # else: # delayVar = 0 # observatoryTime = observatoryTime + delay # tVar = tVar + delayVar # Get the phase corresponding to the current time phase = self.getPhase(observatoryTime) # If a value was received for the tVar, then we compute the expected # value of flux if tVar is not None: # Get standard deviation of t tSigma = np.sqrt(tVar) # Convert the time standard deviation to phase standard deviation phaseSigma = tSigma/self.pulsarPeriod # Check to see if the phase std is bigger than the std # corresponding to a uniform distribution with support = 1. If so, # this indicates that we effectively have no meaningful knowledge # of phase, and can just return the average flux. if phaseSigma > np.sqrt(1/12): signal = self.avgPhotonFlux * self.pulsedFraction * self.detectorArea elif phaseSigma < 1/(100 * self.profileLen): signal = self.getPulseFromPhase(phase) else: phaseFraction = np.mod(phase, 1.0) upperSigma = phaseFraction + (np.sqrt(12) * phaseSigma / 2) lowerSigma = phaseFraction - (np.sqrt(12) * phaseSigma / 2) upperSigmaOffset = 0 lowerSigmaOffset = 0 if upperSigma > 1: upperSigma = upperSigma - 1 upperSigmaOffset = self.singlePeriodIntegral[-1] if lowerSigma < 0: lowerSigma = lowerSigma + 1 lowerSigmaOffset = self.singlePeriodIntegral[-1] signal = ( upperSigmaOffset + np.interp( upperSigma, self.profileIndex, self.singlePeriodIntegral ) - np.interp( lowerSigma, self.profileIndex, self.singlePeriodIntegral) + lowerSigmaOffset ) signal = ( signal / (np.sqrt(12) * tSigma) ) signal = signal * self.scaleFactor else: signal = self.getPulseFromPhase(phase) if self.backgroundCountRate is not None: signal = signal + self.backgroundCountRate return(signal)
[docs] def signalIntegral( self, tStart, tStop, state=None ): r""" Computes the definite integral of the signal over a given time interval. This function computes the definite integral of flux over the specified time interval. Mathematically: .. math:: \Lambda(t_0, T) = \int_{t_0}^T \lambda(t) dT The function uses the periodic nature of the signal in conjuction with the previously computed :attr:`singlePeriodIntegral` to simplify the computation. Specifically, the integral needs to be computed only once. This is a precise expression for the number of photons expected during that time interval. For relatively small time intervals, the integral may be reasonably approximated by .. math:: \Lambda(T, T+\Delta T) = \int_{T}^{T+\Delta T} \lambda(t) dT \approx \lambda(t) \Delta T In that case, this function is not ideal, as it increases computational expense without much added benefit. However in the case of large time intervals, this function will give more accurate results. Args: tStart (float): Start time of the definite integralTStart tStop (float): Stop time of the definite integral state: Optionally, a state object which contains a signal delay Returns: (float): The definite integral of the signal. """ if state is not None: if 'signalDelay' in state: delay = state['signalDelay'] tStart = tStart + delay tStop = tStop + delay # Get the phase corresponding to the current time phaseStart = self.getPhase(tStart) phaseStop = self.getPhase(tStop) completeCycles = np.floor(phaseStop-phaseStart) phaseStartFraction = np.mod(phaseStart, 1.0) phaseStopFraction = np.mod(phaseStop, 1.0) integralTStart = np.interp( phaseStartFraction, self.profileIndex, self.singlePeriodIntegral ) integralTStop = np.interp( phaseStopFraction, self.profileIndex, self.singlePeriodIntegral ) phaseFractionIntegral = integralTStop - integralTStart if phaseFractionIntegral < 0: phaseFractionIntegral = ( phaseFractionIntegral + self.singlePeriodIntegral[-1] ) signalIntegral = ( phaseFractionIntegral + self.singlePeriodIntegral[-1] * completeCycles ) signalIntegral = signalIntegral * self.scaleFactor if self.backgroundCountRate is not None: signalIntegral = ( signalIntegral + self.backgroundCountRate * (tStop - tStart) ) return signalIntegral
[docs] def getPulseFromPhase(self, phase): r""" Returns the value of the signal given the signal phase This function uses the signal :attr:`profile` to compute the value of the signal given the signal phase. It uses simple linear interpolation to find the value. Args: phase (float): The phase number of the signal. Phase can be any numerical value; the fractional phase will be computed by taking remainder of the value divided by 1. Returns: (float): The value of the signal at the given phase """ pFrac = np.mod(phase, 1.0) signal = np.interp(pFrac, self.profileIndex, self.profile) signal = signal * self.scaleFactor return signal
[docs] def MJD2seconds( self, MJD ): r""" A simple utility function to convert modified Julian date to pulsar time See Also: :meth:`seconds2MJD` Args: MJD (float): Modified Julian Date Returns: (float): Time in seconds """ return (MJD - self.PEPOCH) * (24.0 * 60.0 * 60.0)
# return (MJD - self.TZRMJD) * (24.0 * 60.0 * 60.0)
[docs] def seconds2MJD( self, seconds ): r""" A simple utility function to convert seconds to Modified Julian Date See Also: :meth:`MJD2seconds` Args: seconds (float): Time in seconds Returns: (float): Time in Modified Julian Date """ return self.PEPOCH + (seconds/(24.0 * 60.0 * 60.0))
# return self.TZRMJD + (seconds/(24.0 * 60.0 * 60.0))
[docs] def computeAssociationProbability( self, measurement, stateDict, validationThreshold=0 ): r""" Computes the probability of association This method computes the probability of association of a signal given the measurement and prior state. This is a conditional probability based on Bayes rule. Bayes rule is given by .. math:: \textrm{Pr}\left[A_p | \mathbf{y}\right] = \frac{\textrm{Pr}\left[\mathbf{y}|A_p\right] \textrm{Pr}\left[A_p\right]}{\textrm{Pr}\left[\mathbf{y}\right]} Generally, the denominator is not directly computed and will be divided out. For a periodic point source, there are two possible measurements: angle of arrival and time of arrival. If both of these measurements are included in the measurement dictionary, the probability will be computed as .. math:: \textrm{Pr}\left[A_p | (\alpha, t)\right] = \mathcal{N}\left[\alpha, \mathbf{S}_\alpha \right] \lambda(t) \bar{\lambda} These sub-components of the probability are not computed here; rather they are computed by the :class:`~modest.signals.poissonsource.PoissonSource` and :class:`~modest.signals.pointsource.PointSource` parent classes from which this class inherits. So, if something needs to be changed about the computation of association probability, it is likely that it needs to be changed in the parent classes. If either of these measurements is not included in the measurement dict, then that component of the probability will be set to 1. Note: The probability computed here is not an "absolute" probability. It *only* has meaning in relation to other probabilities. In order for this number to be meaningful, it should be normalized with all other possible association probabilities so that their sum is 1. Also note that the probability returns contains :math:`\delta \alpha` and :math:`\delta t` differential terms. These differential terms are divided out in the normalization process. *All the other probabilities must contain these same terms for the comparison to be meaningful and the normalization to be valid.* Args: measurement (dict): A dictionary containing the measured values and their variances stateDict (dict): A dictionary containing the substates and their values validationThreshold (float): Unused Returns: (float): The probability of association """ anglePR = pointsource.PointSource.computeAssociationProbability( self, measurement, stateDict, validationThreshold ) poisPR = poissonsource.DynamicPoissonSource.computeAssociationProbability( self, measurement, stateDict ) # poisPR = 1.0 myPr = (anglePR * poisPR * self.flux) if isnan(myPr): raise ValueError( 'Computed NaN probability. Components: AOA %s, TOA %s, Flux %s' %(anglePR, poisPR, self.peakAmplitude) ) return myPr
[docs] def plot(self, nPeriods=1, tVar=None, figureHandle=None, nPoints=1000 ): """ Plots the pulsar profile This is a simple visualization function that generates a plot of the pulsar profile over a specified number of pulsar periods. Optionally, you can also specify a time uncertainty to see how the *effective* profile changes with time uncertainty. Args: nPeriods (float): Number of periods to plot tVar (float): Variance of time uncertainty. If set to None then no uncertainty is included. figureHandle: Optional handle to an existing figure nPoints (int): Number of points to plot """ if figureHandle is None: plt.figure() tArray = np.linspace( 0, self.pulsarPeriod * nPeriods, nPoints * nPeriods) signalArray = np.zeros(nPoints * nPeriods) for index in range(len(signalArray)): signalArray[index] = self.getSignal(tArray[index], tVar) plt.plot(tArray, signalArray) plt.show(block=False)
[docs] def generatePhotonArrivals( self, tMax, t0=0, timeOffset=None, spacecraft=None, ): """ Generate photon arrival measurements This function generates photon arrivals from the pulsar signal based on the parameters of the simulation. The """ if not timeOffset: timeOffset = 0 nCandidates = np.int((tMax - t0) * self.peakAmplitude * 1.1) # Generate a batch of candidate arrival times (more efficient than generating on the fly) candidateTimeArray = np.random.exponential(1.0/self.peakAmplitude, nCandidates) selectionVariableArray = np.random.uniform(0, 1, nCandidates) photonMeasurements = [] photonArrivalTimes = [] tLastCandidate = t0 # # If we have a position, then we want to get the signal at T0 at that # # position, not the SSB. So, shift T0 accordingly. # if position is not None: # rangeDeltaT = self.unitVec().dot(position(t0)) / self.speedOfLight() # tLastCandidate = tLastCandidate + rangeDeltaT attitude = None if spacecraft is not None: if hasattr(spacecraft, 'dynamics'): myRangeFunction = spacecraft.dynamics.getRangeFunction( self.unitVec(), tMax ) attitude = spacecraft.dynamics.attitude else: def myRangeFunction(t): return (0) if hasattr(spacecraft, 'detector'): TOA_StdDev = spacecraft.detector.TOA_StdDev AOA_StdDev = spacecraft.detector.AOA_StdDev FOV = spacecraft.detector.FOV else: TOA_StdDev = None AOA_StdDev = None FOV = None else: def myRangeFunction(t): return 0 TOA_StdDev = None AOA_StdDev = None attitude = None FOV = None candidateIndex = 0 # plt.figure() # plt.plot(np.linspace(0,tMax,1000), myRangeFunction(np.linspace(0,tMax,1000))) # plt.show(block=False) while tLastCandidate < tMax: # Draw the next arrival time and selection variable from our # pre-generated arrays if candidateIndex < len(candidateTimeArray): nextCandidate = candidateTimeArray[candidateIndex] selectionVariable = selectionVariableArray[candidateIndex] # If we run out, generate more on the fly else: nextCandidate = np.random.exponential(1.0/self.peakAmplitude) selectionVariable = np.random.uniform(0, 1) tNextCandidate = ( tLastCandidate + nextCandidate ) rangeDeltaT = myRangeFunction(tNextCandidate) / self.speedOfLight() # rangeDeltaT = 0 currentFluxNormalized = ( self.getSignal(tNextCandidate + rangeDeltaT)/self.peakAmplitude ) # This if statement uses a uniform variable to determine whether # the next generated photon arrival time is a real photon arrival # time. if selectionVariable <= currentFluxNormalized: # if position is not None: # rangeDeltaT = ( # self.unitVec().dot(position(tNextCandidate - rangeDeltaT)) / # self.speedOfLight() # ) # else: # rangeDeltaT = 0 # newPhotonArrivalTime = tNextCandidate - rangeDeltaT # photonArrivalTimes.append(tNextCandidate - rangeDeltaT) newPhotonArrivalTime = tNextCandidate photonArrivalTimes.append(tNextCandidate) if TOA_StdDev: newPhotonArrivalTime = ( newPhotonArrivalTime + np.random.normal(scale=TOA_StdDev) ) measurementDict = { 't': { 'value': newPhotonArrivalTime - timeOffset, 'var': np.square(TOA_StdDev) }, 'name': self.name } else: measurementDict = { 't': {'value': newPhotonArrivalTime - timeOffset}, 'name': self.name } if attitude is not None: measurementDict.update( self.generateArrivalVector( attitude(newPhotonArrivalTime), AOA_StdDev=AOA_StdDev) ) # measurementDict = { # 't': {'value': newPhotonArrivalTime}, # 'unitVec': {'value': unitVecMeas}, # 'RA': {'value': RaMeas}, # 'DEC': {'value': DecMeas}, # 'name': self.name # } photonMeasurements.append(measurementDict) tLastCandidate = tNextCandidate candidateIndex = candidateIndex + 1 return photonMeasurements
# @staticmethod # def hms2rad(h, m, s): # hours = h + m / 60.0 + s / 3600.0 # return 2.0 * np.pi * hours / 24.0 # @method # def dms2rad(d, m, s): # degrees = d + m / 60.0 + s / 3600.0 # return 2.0 * np.pi * degrees / 360.0
[docs] def speedOfLight( self ): return (299792.458)
[docs] def FWHM(self): maxValue = self.profile[0] halfMax = maxValue / 2 upperIndex = 0 lowerIndex = len(self.profile) - 1 while self.profile[upperIndex] > halfMax: upperIndex += 1 upperIndex += -1 upperHalfIndex = ( upperIndex + ( (halfMax - self.profile[upperIndex]) / ( self.profile[upperIndex + 1] - self.profile[upperIndex] ) ) ) while self.profile[lowerIndex] > halfMax: lowerIndex += -1 lowerHalfIndex = ( lowerIndex + ( (halfMax - self.profile[lowerIndex]) / ( self.profile[lowerIndex + 1] - self.profile[lowerIndex] ) ) ) lowerHalfIndex = lowerHalfIndex - len(self.profile) FWHMIndex = upperHalfIndex - lowerHalfIndex return FWHMIndex * self.pulsarPeriod / len(self.profile)