## @file periodicxraysource.py
# @brief This file contains the PeriodicXRaySource class
import numpy as np
from math import factorial, isnan
import matplotlib.pyplot as plt
from . import pointsource
from . import poissonsource
from .. utils import spacegeometry as sg
[docs]class PeriodicXRaySource(
poissonsource.DynamicPoissonSource,
pointsource.PointSource
):
"""
PeriodicPoissonSource is a class which models the signal from a
periodic poisson source, e.g. a pulsar.
This class accepts a large number of initialization inputs required to
model such a source. Alternatively, the user can pass a PAR file, and the
object will initialize based on that.
"""
def __init__(
self,
profile=None,
avgPhotonFlux=None,
pulsedFraction=None,
PARFile=None,
movePeakToZero=True,
normalizeProfile=True,
phaseDerivatives=None,
pulsarPeriod=None,
RA=None,
DEC=None,
observatoryMJDREF=None,
TZRMJD=None,
PEPOCH=None,
name=None,
attitudeStateName='attitude',
correlationStateName=None,
useProfileColumn=None,
startTime=0,
extent=None,
useTOAprobability=True,
useFitted=True,
detectorArea=1
):
# Store the user-passed arguments first. These take priority of
# parameters received in the PAR file.
self.useFitted = useFitted
"""
Some PAR files (specifically ones generated by us) contain two
frequencies; a "fitted" frequency (computed by fitting a quadratic to
the histogram max function) and a non fitted (computed by just taking
the max of the histogram function). This flag specifies which one to
use if both are present.
"""
self.detectorArea = detectorArea
"""
Allows user to specify area of detector by which flux is multiplied.
Must be in compatible units as flux (i.e. if flux is in
photons / cm^2 / s then detectorArea must be in cm^2). Default is 1.
"""
self.extent=None
"""
Specifies the spatial extent of the pulsar
"""
self.phaseDerivatives = phaseDerivatives
"""
Stores the derivatives of phase with respect to time
phaseDerivatives is a dict which contains the derivatives
of phase with respect to time. The 0th derivative of phase is
simply a constant phase offset (zero by default). The 1st
derivative of phase is frequency. Higher order derivatives reflect
the change in frequency with respect to time.
The dict keys should be integers which indicate the order of the
derivative. e.g.
>>> phaseDerivatives = {0: 0, 1: 29.7417, 2: -3.7184e-19, 3: 1.1949e-20}
(Phase derivatives shown for Crab Nebula pulsar)
"""
self.fittedPhaseDerivatives = None
"""
Alternative phase derivatives, derived by fitting rather than taking max.
"""
self.TZRMJD = TZRMJD
"""
TZRMJD stores the Modified Julian Date (MJD) corresponding
to t=0 seconds (for the purpose of computing the pulsar phase)
"""
self.PEPOCH = PEPOCH
self.observatoryMJDREF = observatoryMJDREF
"""
observatoryMJDREF stores the MJD corresponding to t=0 seconds
for photon observations
"""
self.name = name
"""
name is an arbitrary string that is used to identify the
signal. Can be set to whatever the user wants, since it is only
used for display purposes. Usually set to the astrophysical
signal's name (i.e. "Crab Pulsar" or "PSR B0531+21")
"""
self.avgPhotonFlux = avgPhotonFlux
"""
averagePhotonFlux is the mean value of photons expected to
# arrive from the signal per second (units of photons/second)
"""
self.pulsedFraction = pulsedFraction
"""
pulsedFraction is a value from 0 to 1 which indicates what
percentage of the avgPhotonFlux is "pulsed" and what percentage is
constant/background
"""
self.covRA = None
"""
Spatial covariance of source in right-ascension
"""
self.covDEC = None
"""
Spatial covariance of source in declination
"""
self.covX = None
"""
Cross-correlation of spatial covariance terms
"""
self.fluxERG = None
"""
Energy flux in ergs per unit area
"""
self.fluxKEV=None
"""
Energy flux in KEV per unit area
"""
self.profile=None
"""
profile is a numpy array containing the numerical value of
flux over a single period of the signal.
The profile array contains the signal profile of the pulsar
(or periodic source) being modeled. If the user selected to
normalize the profle, then the profile will be normalized from zero
to one, and then scaled based on the average flux value. If the
profile is not normalized, then the raw values will be used for
computing the signal. If the profile is normalized but no average
flux value is received, a warning will be issued.
"""
# Process the PAR file, if received. Give priority to parameters
# passed directly as init arguments.
if PARFile is not None:
self.PARFile = PARFile
PAR_RA, PAR_Dec = self.processPARFile(PARFile)
if (RA is None):
RA = PAR_RA
if (DEC is None):
DEC = PAR_Dec
# Check to see if fitted phase derivatives were stored. IF not, don't try to use them.
if self.useFitted:
if self.fittedPhaseDerivatives == None:
self.useFitted = False
if np.all([self.covRA, self.covDEC, self.covX]):
self.extent = np.array([
[self.covRA, self.covX],
[self.covX, self.covDEC]
])
# Initialize PointSource with Right ascension and declination values
pointsource.PointSource.__init__(
self, RA, DEC, attitudeStateName=attitudeStateName, extent=self.extent
)
# Check to make sure that we received either a phaseDerivatives dict
# or a value for pulsar period
if (
(self.phaseDerivatives is None) and
(self.fittedPhaseDerivatives is None) and
(pulsarPeriod is None)
):
raise ValueError(
"Not enough timing information was received to initialize a " +
"pulsar signal. One of the following inputs is required: \n" +
"- pulsarPeriod \n" +
"- phaseDerivatives" +
"- PARFile containing frequency information"
)
self.normalizeProfile = normalizeProfile
"""
normalizeProfile is a boolean flag used to indicate whether
the profile is normalized from zero to one. If so, then the signal
should be multiplied by #peakAmplitude.
"""
# Process whatever was passed as the profile
if profile is not None:
self.processProfile(profile, normalizeProfile, movePeakToZero, useProfileColumn)
self.TZeroDiff = 0
"""
TZeroDiff is the time difference between the pulsar's t zero (i.e.
PEPOCH) and the observatory's tZero. This amount is subtracted from
the observatory time in calculations of flux, period, etc. to sync
observatory time with the pulsar parameter's reference time
"""
if (self.PEPOCH is not None) and (self.observatoryMJDREF is not None):
self.TZeroDiff = (self.PEPOCH - self.observatoryMJDREF)*24*60*60
else:
self.TZeroDiff = 0
if correlationStateName is None:
correlationStateName = self.name
# Update the pulsar period and time array.
self.pulsarPeriod = 0
"""
pulsarPeriod is the amouont of time (in seconds) for one
complete pulsar pulse.
"""
if self.useFitted:
self.pulsarPeriod = 1/self.fittedPhaseDerivatives[1]
else:
self.pulsarPeriod = 1/self.phaseDerivatives[1]
self.computeSinglePeriodIntegral()
poissonsource.DynamicPoissonSource.__init__(
self,
self.avgPhotonFlux*detectorArea,
maxFlux=self.peakAmplitude,
correlationStateName=correlationStateName,
startTime=startTime,
useTOAprobability=useTOAprobability
)
return
[docs] def processProfile(
self,
profile,
normalizeProfile=True,
movePeakToZero=True,
useProfileColumn=None
):
r"""
processProfile reads in a series of data points representing the pulsar's pulse profile, and processes it for use in generating signals.
Args:
profle: Either a string pointing to a file, or a list of values representing the profile
normalizeProfile (bool): Determines whether to normalize the profile to be between zero and one.
movePeakToZero (bool): Determines whether the max value of the profile will be shifted to zero. This is fairly standard convention.
useProfileColumn (int): Optionally, determine which column of the profile file is contains the profile information. Different profile files have various numbers of columns. By default, method will use the last column.
"""
# If a string is received, assume this points to a file and try to
# open it and import the data.
if type(profile) is str:
profileArray = np.loadtxt(profile)
if useProfileColumn is None:
profileArray = profileArray[:, len(profileArray[0]) - 1]
else:
profileArray = profileArray[:, int(useProfileColumn)]
profile = profileArray
if normalizeProfile is True:
if (self.avgPhotonFlux is None) or (self.pulsedFraction is None):
raise Warning(
"You are normalizing the profile from zero to one, but " +
"you haven't given values for average photon flux or " +
"pulsed fraction. This will result in a signal which " +
"is most likely not scaled properly."
)
profile = profile - np.min(profile)
profile = profile/np.max(profile)
if movePeakToZero is True:
if np.argmax(profile) != 0:
profile = np.roll(profile, -np.argmax(profile))
profile = np.append(profile, profile[0])
self.profile = profile
self.profileIndex = np.linspace(0, 1, len(self.profile))
return
[docs] def processPARFile(
self,
PARFile,
replaceCurrentValues=False
):
r"""
Read a PAR file passed by the user and change/initialize object attributes accordingly.
`PAR files <http://www.jb.man.ac.uk/~pulsar/Resources/tempo_usage.txt>`_
are a semi-standardized way to store information about pulsars. Most PAR files contain, at a minimum, the name of the pulsar, the sky coordinates of the pulsar (i.e. right ascension and declination) and frequency information about the pulsar. They may also contain other helpful information such as reported glitches, pulsar binary model information, etc. Rather than force the user to enter all this information manually, the object can initialize from a PAR file and gather the relevant information on its own.
In order to further simplify the pulsar object initialization pipeline, this method also processes additional information found in the PAR file which isn't part of the standard PAR file format. This includes information like source extent (i.e. how big is the object in the sky), source flux (photons per second per unit area, energy per second per unit area), pulsed fraction, and pulse profile. This information is not normally included in a PAR file. However, it is relatively easy to add it manually, and this simplifies workflow by keeping all the relevant information about the pulsar in one file.
Args:
PARFile (str): Path to PAR file to be read
replaceCurrentValues (bool): Determines whether previously stored values will be overwritten by values found in the PAR file
Returns:
(float, float): Tuple containing the right ascension and declination of the source found in the PAR file
"""
# Read PAR file, and split into lines
parTextFile = open(PARFile, "r")
lines = parTextFile.read().split('\n')
PARPhaseDerivatives = {0: 0}
PARFittedPhaseDerivatives = None
newProfile = []
for line in lines:
# Split the line into a list of strings, and strip the
# whitespace
splitLine = line.strip().split()
# If the line contains relevant information, store it.
if len(splitLine) > 0:
# FREQUENCY
# If line contains frequency information, the first string
# should be of the format "F<i>" where i is an integer.
if (
(splitLine[0][0] == 'F') and
(len(splitLine[0]) == 2)
):
# Extract the order of derivative of the phase.
# e.g. F0 is the 1st derivative of phase, F1 is the
# 2nd derivative of phase, etc.
freqOrder = int(splitLine[0][1]) + 1
PARPhaseDerivatives[freqOrder] = float(splitLine[1])
# FITTED FREQUENCY
# If line contains fitted frequency information, the first string
# should be of the format "F<i>_f" where i is an integer.
if (
(splitLine[0][0] == 'F') and
(splitLine[0][-2:] == '_f')
):
if PARFittedPhaseDerivatives == None:
PARFittedPhaseDerivatives = {0: 0}
# Extract the order of derivative of the phase.
# e.g. F0 is the 1st derivative of phase, F1 is the
# 2nd derivative of phase, etc.
freqOrder = int(splitLine[0][1]) + 1
PARFittedPhaseDerivatives[freqOrder] = float(splitLine[1])
# RIGHT ASCENSION
elif ((splitLine[0] == 'RAJ') or
(splitLine[0] == 'RA')):
# PAR files store right ascension as HH:MM:SS, so split
# on the ":" character
hmsArray = splitLine[1].split(':')
# print(hmsArray)
PAR_RA = (
sg.hms2rad(
float(hmsArray[0]),
float(hmsArray[1]),
float(hmsArray[2])
)
)
# DECLINATION
elif ((splitLine[0] == 'DECJ') or
(splitLine[0] == 'DEC')):
# Split on ":" and convert to radians
dmsArray = splitLine[1].split(':')
PAR_Dec = (
sg.dms2rad(
float(dmsArray[0]),
float(dmsArray[1]),
float(dmsArray[2])
)
)
# T-zero Mod Julian Date
elif (splitLine[0] == 'TZRMJD'):
PAR_TZRMJD = float(splitLine[1])
if (self.TZRMJD is None) or (replaceCurrentValues is True):
self.TZRMJD = PAR_TZRMJD
# PEPOCH Mod Julian Date
elif (splitLine[0] == 'PEPOCH'):
PAR_PEPOCH = float(splitLine[1])
if (self.PEPOCH is None) or (replaceCurrentValues is True):
self.PEPOCH = PAR_PEPOCH
# Spatial covariance matrix, right ascension
elif (splitLine[0] == 'COV_RA'):
covRA = float(splitLine[1])
if (not self.covRA) or (replaceCurrentValues):
self.covRA = covRA
# Spatial covariance matrix, declination
elif (splitLine[0] == 'COV_DEC'):
covDEC = float(splitLine[1])
if (not self.covDEC) or (replaceCurrentValues):
self.covDEC = covDEC
# Spatial covariance matrix, cross correlation
elif (splitLine[0] == 'COV_X'):
covX = float(splitLine[1])
if (not self.covX) or (replaceCurrentValues):
self.covX = covX
# Pulsar Name
elif ((splitLine[0] == 'PSRJ')
or
(splitLine[0] == 'PSR')):
if (self.name is None) or (replaceCurrentValues is True):
self.name = splitLine[1]
# Energy flux in ergs per unit area
elif (splitLine[0] == 'FLX_ERG'):
if (self.fluxERG is None) or (replaceCurrentValues is True):
self.fluxERG = float(splitLine[1])
# Energy flux in KEV per unit area
elif (splitLine[0] == 'FLX_KEV'):
if (self.fluxKEV is None) or (replaceCurrentValues is True):
self.fluxKEV = float(splitLine[1])
# Photon flux per unit area
elif (splitLine[0] == 'FLX_PH'):
if (self.avgPhotonFlux is None) or (replaceCurrentValues is True):
self.avgPhotonFlux = float(splitLine[1])
# Fraction of photon flux that is pulsed
elif (splitLine[0] == 'P_FRAC'):
if (self.pulsedFraction is None) or (replaceCurrentValues is True):
self.pulsedFraction = float(splitLine[1])
# Profile (assumed to be in order
elif (splitLine[0] == 'PROFILE'):
newProfile.append(float(splitLine[1]))
if (
(self.phaseDerivatives is None) or
(replaceCurrentValues is True)
):
self.phaseDerivatives = PARPhaseDerivatives
if (
(self.fittedPhaseDerivatives is None) or
(replaceCurrentValues is True)
):
self.fittedPhaseDerivatives = PARFittedPhaseDerivatives
if len(newProfile) > 0:
# plt.figure()
# plt.plot(newProfile)
# plt.show(block=False)
# print(self.profile)
if replaceCurrentValues or (not np.any(self.profile)):
self.processProfile(newProfile)
return(PAR_RA, PAR_Dec)
[docs] def computeSinglePeriodIntegral(
self
):
r"""
Compute the integral as a function of time of the pulsar flux.
This will be used later to compute expected value of flux in the case
where time is uncertain. Generally this function only needs to be called once at initialization; once the integral is computed it is stored as an attribute and can be accessed on demand. Integral is computed using trapezoidal integration.
"""
self.singlePeriodIntegral = np.zeros(len(self.profile))
"""
An array containing the numerically integrated flux over a single period
"""
singlePeriodTimeArray = np.linspace(
0, self.pulsarPeriod, len(self.profile)
)
self.profileLen = len(self.profile)
for i in range(len(self.profile)):
self.singlePeriodIntegral[i] = np.trapz(
self.profile[0:i + 1],
singlePeriodTimeArray[0:i + 1],
axis=0
)
# Store the total flux integral over one period
fluxIntegral = self.singlePeriodIntegral[-1]
# Compute the scaling factor based on the average photon flux (if the
# average flux was given)
if self.avgPhotonFlux is not None:
self.photonsPerPeriod = (
self.avgPhotonFlux * self.pulsarPeriod * self.detectorArea
)
"""
Expected number of photons for a given pulsar period
"""
self.peakAmplitude = self.photonsPerPeriod / fluxIntegral
"""
Maximum amplitude of the pulsar signal. Scaled so that the average flux is equal to the value given in initialization
"""
# If some of the flux is unpulsed, scale the peak amplitude
# accordingly and compute a background rate to account for the
# unpulsed portion
if self.pulsedFraction is not None:
self.scaleFactor = self.peakAmplitude * self.pulsedFraction
"""
Amount by which the signal is to be multiplied to get the correct flux values
"""
self.backgroundCountRate = (
self.avgPhotonFlux * self.detectorArea *
(1 - self.pulsedFraction)
)
"""
Flux of the unpulsed portion of the signal. Not to be confused with other background sources.
"""
else:
self.backgroundCountRate = 0
self.scaleFactor = self.peakAmplitude
else:
self.peakAmplitude = np.max(self.profile)
self.scaleFactor = 1.0
self.backgroundCountRate = 0
return
[docs] def getPhase(
self,
observatoryTime
):
r"""
Compute the current phase of the pulsar signal.
This method uses all available frequency/phase information including derivatives. However, it does not check for validity of the time passed. Some PAR files are valid over only a specific region of time, and this function will not verify that the time given is within that time window.
It is up to the user to verify the validity of the results.
Args:
observatoryTime (float): Local time at which flux is to be computed.
Returns:
(float): Current pulsar phase
"""
phase = 0
shiftedObservatoryTime = observatoryTime - self.TZeroDiff
if self.useFitted:
myPhaseDerivatives = self.fittedPhaseDerivatives
else:
myPhaseDerivatives = self.phaseDerivatives
for order in myPhaseDerivatives:
phase = (
phase +
(
myPhaseDerivatives[order] *
np.power(shiftedObservatoryTime, order) /
factorial(order)
)
)
return(phase)
[docs] def getFrequency(
self,
observatoryTime
):
r"""
Compute the current frequency of the pulsar signal
This function uses all available frequency information including derivatives to compute the current frequency of the pulsar signal at the given time. However it does not check for time validity. Some PAR files are valid over only a specified time frame; it is up to the user to verify whether the time given is within that region.
See Also:
:meth:`getPeriod`
Args:
observatoryTime (float): Time for which frequency is to be computed
Returns:
(float): Computed frequency in Hz
"""
frequency = 0
shiftedObservatoryTime = observatoryTime - self.TZeroDiff
if self.useFitted:
myPhaseDerivatives = self.fittedPhaseDerivatives
else:
myPhaseDerivatives = self.phaseDerivatives
for order in myPhaseDerivatives:
if order > 0:
frequency = (
frequency +
(
myPhaseDerivatives[order] *
np.power(shiftedObservatoryTime, order-1)/
factorial(order - 1)
)
)
return frequency
[docs] def getPeriod(
self,
observatoryTime
):
r"""
Compute the current period of the pulsar signal
This function uses all available frequency information including derivatives to compute the current period of the pulsar signal at the given time. However it does not check for time validity. Some PAR files are valid over only a specified time frame; it is up to the user to verify whether the time given is within that region.
See Also:
:meth:`getFrequency`
Args:
observatoryTime (float): Time for which period is to be computed
Returns:
(float): Computed period in seconds
"""
return 1/self.getFrequency(observatoryTime)
[docs] def getSignalMJD(
self,
MJD
):
r"""
getSignalMJD is a wrapper function that returns the photon flux
at a given Modified Julian Date
See Also:
:meth:`getSignal`
Args:
MJD (float): The Modified Jullian Date for which flux is to be returned
Returns:
(float): The signal at the requested date
"""
observatoryTime = self.MJD2seconds(MJD)
return (self.getSignal(observatoryTime))
[docs] def getSignal(
self,
observatoryTime,
tVar=None,
#state=None
):
r"""
getSignal is responsible for returning the photon flux from the
pulsar at a given time
The getSignal method is the method which is responsible for
returning the current value of flux from the signal source at a given
time. If there is uncertainty in the time, then the expected value of
the signal is returned. Uncertainty in time is indicated by the
optional tVar argument.
In every case, the method calls the getPhase method to determine the
current signal source phase. If no uncertainty in time is passed, then
the getPulseFromPhase method is called to lookup/interpolate the
current flux based on the phase and the signal :attr:`profile`.
If a value is passed for tVar, then the process is more complicated.
Rather than simply look up the signal value from the phase, the function
returns an approximation of the expected value of the signal, given the
mean and variance of time.
Normally, the expected value would be computed by integrating the
product of the true distribution of time (probably a normal distribution)
with the flux as a function of time. However, due to the non-analytical
nature of the flux function, the direct computation of this integral is
intractable. To overcome this limitation, the time distributuion is
approximated as a moment-matched uniform distribution. Using this
approximation, the approximate integral may be directly computed simply
by looking up the start and end values of :attr:`singlePeriodIntegral`, and
dividing by the appropriate scaling factor.
Args:
observatoryTime (float): The time for which to compute the signal
tVar (float): The variance of the time estimate (optional)
Returns:
(float): The signal at the requested time, in photons/second
"""
# if state is not None:
# if 'signalDelay' in state:
# delay = state['signalDelay']
# if 'delayVar' in state:
# delayVar = state['delayVar']
# else:
# delayVar = 0
# observatoryTime = observatoryTime + delay
# tVar = tVar + delayVar
# Get the phase corresponding to the current time
phase = self.getPhase(observatoryTime)
# If a value was received for the tVar, then we compute the expected
# value of flux
if tVar is not None:
# Get standard deviation of t
tSigma = np.sqrt(tVar)
# Convert the time standard deviation to phase standard deviation
phaseSigma = tSigma/self.pulsarPeriod
# Check to see if the phase std is bigger than the std
# corresponding to a uniform distribution with support = 1. If so,
# this indicates that we effectively have no meaningful knowledge
# of phase, and can just return the average flux.
if phaseSigma > np.sqrt(1/12):
signal = self.avgPhotonFlux * self.pulsedFraction * self.detectorArea
elif phaseSigma < 1/(100 * self.profileLen):
signal = self.getPulseFromPhase(phase)
else:
phaseFraction = np.mod(phase, 1.0)
upperSigma = phaseFraction + (np.sqrt(12) * phaseSigma / 2)
lowerSigma = phaseFraction - (np.sqrt(12) * phaseSigma / 2)
upperSigmaOffset = 0
lowerSigmaOffset = 0
if upperSigma > 1:
upperSigma = upperSigma - 1
upperSigmaOffset = self.singlePeriodIntegral[-1]
if lowerSigma < 0:
lowerSigma = lowerSigma + 1
lowerSigmaOffset = self.singlePeriodIntegral[-1]
signal = (
upperSigmaOffset +
np.interp(
upperSigma,
self.profileIndex,
self.singlePeriodIntegral
) -
np.interp(
lowerSigma,
self.profileIndex,
self.singlePeriodIntegral)
+ lowerSigmaOffset
)
signal = (
signal /
(np.sqrt(12) * tSigma)
)
signal = signal * self.scaleFactor
else:
signal = self.getPulseFromPhase(phase)
if self.backgroundCountRate is not None:
signal = signal + self.backgroundCountRate
return(signal)
[docs] def signalIntegral(
self,
tStart,
tStop,
state=None
):
r"""
Computes the definite integral of the signal over a given time interval.
This function computes the definite integral of flux over the specified time interval. Mathematically:
.. math::
\Lambda(t_0, T) = \int_{t_0}^T \lambda(t) dT
The function uses the periodic nature of the signal in conjuction with the previously computed :attr:`singlePeriodIntegral` to simplify the computation. Specifically, the integral needs to be computed only once.
This is a precise expression for the number of photons expected during that time interval. For relatively small time intervals, the integral may be reasonably approximated by
.. math::
\Lambda(T, T+\Delta T) = \int_{T}^{T+\Delta T} \lambda(t) dT \approx \lambda(t) \Delta T
In that case, this function is not ideal, as it increases computational expense without much added benefit. However in the case of large time intervals, this function will give more accurate results.
Args:
tStart (float): Start time of the definite integralTStart
tStop (float): Stop time of the definite integral
state: Optionally, a state object which contains a signal delay
Returns:
(float): The definite integral of the signal.
"""
if state is not None:
if 'signalDelay' in state:
delay = state['signalDelay']
tStart = tStart + delay
tStop = tStop + delay
# Get the phase corresponding to the current time
phaseStart = self.getPhase(tStart)
phaseStop = self.getPhase(tStop)
completeCycles = np.floor(phaseStop-phaseStart)
phaseStartFraction = np.mod(phaseStart, 1.0)
phaseStopFraction = np.mod(phaseStop, 1.0)
integralTStart = np.interp(
phaseStartFraction,
self.profileIndex,
self.singlePeriodIntegral
)
integralTStop = np.interp(
phaseStopFraction,
self.profileIndex,
self.singlePeriodIntegral
)
phaseFractionIntegral = integralTStop - integralTStart
if phaseFractionIntegral < 0:
phaseFractionIntegral = (
phaseFractionIntegral + self.singlePeriodIntegral[-1]
)
signalIntegral = (
phaseFractionIntegral + self.singlePeriodIntegral[-1] * completeCycles
)
signalIntegral = signalIntegral * self.scaleFactor
if self.backgroundCountRate is not None:
signalIntegral = (
signalIntegral +
self.backgroundCountRate * (tStop - tStart)
)
return signalIntegral
[docs] def getPulseFromPhase(self,
phase):
r"""
Returns the value of the signal given the signal phase
This function uses the signal :attr:`profile` to compute the value of the signal given the signal phase. It uses simple linear interpolation to find the value.
Args:
phase (float): The phase number of the signal. Phase can be any numerical value; the fractional phase will be computed by taking remainder of the value divided by 1.
Returns:
(float): The value of the signal at the given phase
"""
pFrac = np.mod(phase, 1.0)
signal = np.interp(pFrac, self.profileIndex, self.profile)
signal = signal * self.scaleFactor
return signal
[docs] def MJD2seconds(
self,
MJD
):
r"""
A simple utility function to convert modified Julian date to pulsar time
See Also:
:meth:`seconds2MJD`
Args:
MJD (float): Modified Julian Date
Returns:
(float): Time in seconds
"""
return (MJD - self.PEPOCH) * (24.0 * 60.0 * 60.0)
# return (MJD - self.TZRMJD) * (24.0 * 60.0 * 60.0)
[docs] def seconds2MJD(
self,
seconds
):
r"""
A simple utility function to convert seconds to Modified Julian Date
See Also:
:meth:`MJD2seconds`
Args:
seconds (float): Time in seconds
Returns:
(float): Time in Modified Julian Date
"""
return self.PEPOCH + (seconds/(24.0 * 60.0 * 60.0))
# return self.TZRMJD + (seconds/(24.0 * 60.0 * 60.0))
[docs] def computeAssociationProbability(
self,
measurement,
stateDict,
validationThreshold=0
):
r"""
Computes the probability of association
This method computes the probability of association of a signal given the measurement and prior state. This is a conditional probability based on Bayes rule. Bayes rule is given by
.. math::
\textrm{Pr}\left[A_p | \mathbf{y}\right] = \frac{\textrm{Pr}\left[\mathbf{y}|A_p\right] \textrm{Pr}\left[A_p\right]}{\textrm{Pr}\left[\mathbf{y}\right]}
Generally, the denominator is not directly computed and will be divided out.
For a periodic point source, there are two possible measurements: angle of arrival and time of arrival. If both of these measurements are included in the measurement dictionary, the probability will be computed as
.. math::
\textrm{Pr}\left[A_p | (\alpha, t)\right] = \mathcal{N}\left[\alpha, \mathbf{S}_\alpha \right] \lambda(t) \bar{\lambda}
These sub-components of the probability are not computed here; rather they are computed by the :class:`~modest.signals.poissonsource.PoissonSource` and :class:`~modest.signals.pointsource.PointSource` parent classes from which this class inherits. So, if something needs to be changed about the computation of association probability, it is likely that it needs to be changed in the parent classes.
If either of these measurements is not included in the measurement dict, then that component of the probability will be set to 1.
Note:
The probability computed here is not an "absolute" probability. It *only* has meaning in relation to other probabilities. In order for this number to be meaningful, it should be normalized with all other possible association probabilities so that their sum is 1. Also note that the probability returns contains :math:`\delta \alpha` and :math:`\delta t` differential terms. These differential terms are divided out in the normalization process. *All the other probabilities must contain these same terms for the comparison to be meaningful and the normalization to be valid.*
Args:
measurement (dict): A dictionary containing the measured values and their variances
stateDict (dict): A dictionary containing the substates and their values
validationThreshold (float): Unused
Returns:
(float): The probability of association
"""
anglePR = pointsource.PointSource.computeAssociationProbability(
self,
measurement,
stateDict,
validationThreshold
)
poisPR = poissonsource.DynamicPoissonSource.computeAssociationProbability(
self,
measurement,
stateDict
)
# poisPR = 1.0
myPr = (anglePR * poisPR * self.flux)
if isnan(myPr):
raise ValueError(
'Computed NaN probability. Components: AOA %s, TOA %s, Flux %s'
%(anglePR, poisPR, self.peakAmplitude)
)
return myPr
[docs] def plot(self,
nPeriods=1,
tVar=None,
figureHandle=None,
nPoints=1000
):
"""
Plots the pulsar profile
This is a simple visualization function that generates a plot of the pulsar profile over a specified number of pulsar periods. Optionally, you can also specify a time uncertainty to see how the *effective* profile changes with time uncertainty.
Args:
nPeriods (float): Number of periods to plot
tVar (float): Variance of time uncertainty. If set to None then no uncertainty is included.
figureHandle: Optional handle to an existing figure
nPoints (int): Number of points to plot
"""
if figureHandle is None:
plt.figure()
tArray = np.linspace(
0,
self.pulsarPeriod * nPeriods,
nPoints * nPeriods)
signalArray = np.zeros(nPoints * nPeriods)
for index in range(len(signalArray)):
signalArray[index] = self.getSignal(tArray[index], tVar)
plt.plot(tArray, signalArray)
plt.show(block=False)
[docs] def generatePhotonArrivals(
self,
tMax,
t0=0,
timeOffset=None,
spacecraft=None,
):
"""
Generate photon arrival measurements
This function generates photon arrivals from the pulsar signal based on the parameters of the simulation. The
"""
if not timeOffset:
timeOffset = 0
nCandidates = np.int((tMax - t0) * self.peakAmplitude * 1.1)
# Generate a batch of candidate arrival times (more efficient than generating on the fly)
candidateTimeArray = np.random.exponential(1.0/self.peakAmplitude, nCandidates)
selectionVariableArray = np.random.uniform(0, 1, nCandidates)
photonMeasurements = []
photonArrivalTimes = []
tLastCandidate = t0
# # If we have a position, then we want to get the signal at T0 at that
# # position, not the SSB. So, shift T0 accordingly.
# if position is not None:
# rangeDeltaT = self.unitVec().dot(position(t0)) / self.speedOfLight()
# tLastCandidate = tLastCandidate + rangeDeltaT
attitude = None
if spacecraft is not None:
if hasattr(spacecraft, 'dynamics'):
myRangeFunction = spacecraft.dynamics.getRangeFunction(
self.unitVec(),
tMax
)
attitude = spacecraft.dynamics.attitude
else:
def myRangeFunction(t):
return (0)
if hasattr(spacecraft, 'detector'):
TOA_StdDev = spacecraft.detector.TOA_StdDev
AOA_StdDev = spacecraft.detector.AOA_StdDev
FOV = spacecraft.detector.FOV
else:
TOA_StdDev = None
AOA_StdDev = None
FOV = None
else:
def myRangeFunction(t):
return 0
TOA_StdDev = None
AOA_StdDev = None
attitude = None
FOV = None
candidateIndex = 0
# plt.figure()
# plt.plot(np.linspace(0,tMax,1000), myRangeFunction(np.linspace(0,tMax,1000)))
# plt.show(block=False)
while tLastCandidate < tMax:
# Draw the next arrival time and selection variable from our
# pre-generated arrays
if candidateIndex < len(candidateTimeArray):
nextCandidate = candidateTimeArray[candidateIndex]
selectionVariable = selectionVariableArray[candidateIndex]
# If we run out, generate more on the fly
else:
nextCandidate = np.random.exponential(1.0/self.peakAmplitude)
selectionVariable = np.random.uniform(0, 1)
tNextCandidate = (
tLastCandidate +
nextCandidate
)
rangeDeltaT = myRangeFunction(tNextCandidate) / self.speedOfLight()
# rangeDeltaT = 0
currentFluxNormalized = (
self.getSignal(tNextCandidate + rangeDeltaT)/self.peakAmplitude
)
# This if statement uses a uniform variable to determine whether
# the next generated photon arrival time is a real photon arrival
# time.
if selectionVariable <= currentFluxNormalized:
# if position is not None:
# rangeDeltaT = (
# self.unitVec().dot(position(tNextCandidate - rangeDeltaT)) /
# self.speedOfLight()
# )
# else:
# rangeDeltaT = 0
# newPhotonArrivalTime = tNextCandidate - rangeDeltaT
# photonArrivalTimes.append(tNextCandidate - rangeDeltaT)
newPhotonArrivalTime = tNextCandidate
photonArrivalTimes.append(tNextCandidate)
if TOA_StdDev:
newPhotonArrivalTime = (
newPhotonArrivalTime + np.random.normal(scale=TOA_StdDev)
)
measurementDict = {
't': {
'value': newPhotonArrivalTime - timeOffset,
'var': np.square(TOA_StdDev)
},
'name': self.name
}
else:
measurementDict = {
't': {'value': newPhotonArrivalTime - timeOffset},
'name': self.name
}
if attitude is not None:
measurementDict.update(
self.generateArrivalVector(
attitude(newPhotonArrivalTime),
AOA_StdDev=AOA_StdDev)
)
# measurementDict = {
# 't': {'value': newPhotonArrivalTime},
# 'unitVec': {'value': unitVecMeas},
# 'RA': {'value': RaMeas},
# 'DEC': {'value': DecMeas},
# 'name': self.name
# }
photonMeasurements.append(measurementDict)
tLastCandidate = tNextCandidate
candidateIndex = candidateIndex + 1
return photonMeasurements
# @staticmethod
# def hms2rad(h, m, s):
# hours = h + m / 60.0 + s / 3600.0
# return 2.0 * np.pi * hours / 24.0
# @method
# def dms2rad(d, m, s):
# degrees = d + m / 60.0 + s / 3600.0
# return 2.0 * np.pi * degrees / 360.0
[docs] def speedOfLight(
self
):
return (299792.458)
[docs] def FWHM(self):
maxValue = self.profile[0]
halfMax = maxValue / 2
upperIndex = 0
lowerIndex = len(self.profile) - 1
while self.profile[upperIndex] > halfMax:
upperIndex += 1
upperIndex += -1
upperHalfIndex = (
upperIndex +
(
(halfMax - self.profile[upperIndex]) /
(
self.profile[upperIndex + 1] -
self.profile[upperIndex]
)
)
)
while self.profile[lowerIndex] > halfMax:
lowerIndex += -1
lowerHalfIndex = (
lowerIndex +
(
(halfMax - self.profile[lowerIndex]) /
(
self.profile[lowerIndex + 1] -
self.profile[lowerIndex]
)
)
)
lowerHalfIndex = lowerHalfIndex - len(self.profile)
FWHMIndex = upperHalfIndex - lowerHalfIndex
return FWHMIndex * self.pulsarPeriod / len(self.profile)