"""Simulation functions for ideal sky observations."""
from typing import Literal
import astropy.coordinates as apc
import astropy.time as apt
import numpy as np
from astropy import units as un
from pygsdata import coordinates as gscrd
from read_acq import _coordinates as crda
from tqdm import tqdm
from .. import const
from .. import modeling as mdl
from . import sky_models
from .beams import Beam
# Reference UTC observation time. At this time, the LST is 0.1666 (00:10 Hrs LST) at the
# EDGES location. NOTE: this is used by default, but can be changed by the user anywhere
# it is used.
REFERENCE_TIME = apt.Time("2014-01-01T09:39:42", location=const.edges_location)
[docs]
def sky_convolution_generator(
lsts: np.ndarray,
beam: Beam,
sky_model: sky_models.SkyModel,
index_model: sky_models.IndexModel,
normalize_beam: bool,
beam_smoothing: bool,
smoothing_model: mdl.Model,
ground_loss: np.ndarray | None = None,
location: apc.EarthLocation = const.KNOWN_TELESCOPES["edges-low"].location,
ref_time: apt.Time = REFERENCE_TIME,
interp_kind: Literal[
"linear",
"nearest",
"slinear",
"cubic",
"quintic",
"pchip",
"spline",
"sphere-spline",
] = "sphere-spline",
lst_progress: bool = True,
freq_progress: bool = True,
ref_freq_idx: int = 0,
use_astropy_azel: bool = True,
):
"""
Iterate through given LSTs and generate a beam*sky product at each freq and LST.
This is a generator, so it will yield a single item at a time (to save on memory).
Parameters
----------
lsts
The LSTs at which to evaluate the convolution.
ground_loss
An array of ground-loss values for the beam, shape (Nfreq,).
beam
The beam to convolve.
sky_model
The sky model to convolve
index_model
The spectral index model of the sky model.
normalize_beam
Whether to ensure the beam is properly normalised.
beam_interpolation
Whether to smooth over freq axis
interp_kind
The kind of interpolation to use for the beam. "spline" uses
:class:`scipy.interpolate.RectBivariateSpline` and "sphere-spline" uses
:class:`scipy.interpolate.RectSphereBivariateSpline`. All other options use
:class:`scipy.interpolate.RegularGridInterpolator`. with the given kind as
``method``.
use_astropy_azel
Whether to use the astropy coordinate system for azimuth and elevation. If
False, compute the az/el using Alan's method.
Yields
------
i
The LST enumerator
j
The frequency enumerator
mean_conv_temp
The mean temperature after multiplying by the beam (above the horizon)
conv_temp
An array containing the temperature after multiuplying by the beam in each pixel
above the horizon.
sky
An array containing the sky temperature in pixel above the horizon.
beam
An array containing the interpolatedbeam in pixels above the horizon
time
The local time at each LST.
n_pixels
The total number of pixels that are not masked.
Examples
--------
Use this function as follows:
>>> for i, j, mean_t, conv_t, sky, bm, time, npix in sky_convolution_generator():
>>> print(conv_t)
"""
if beam_smoothing:
beam = beam.smoothed(smoothing_model)
if ground_loss is None:
ground_gain = np.ones(len(beam.frequency))
else:
ground_gain = np.asarray(ground_loss)
# Get the local times corresponding to the given LSTs
times = gscrd.lsts_to_times(lsts * un.hourangle, ref_time, location)
beam_above_horizon = np.full(sky_model.coords.shape, np.nan)
interpolators = {}
for lst_idx, time in tqdm(
enumerate(times), unit="LST", disable=not lst_progress, total=len(times)
):
# Transform Galactic coordinates of Sky Model to Local coordinates
if use_astropy_azel:
altaz = sky_model.coords.transform_to(
apc.AltAz(location=location, obstime=time)
)
az = np.asarray(altaz.az.deg)
el = np.asarray(altaz.alt.deg)
else:
ra, dec = crda.galactic_to_radec(
sky_model.coords.galactic.b.deg, sky_model.coords.galactic.l.deg
)
az, el = crda.radec_azel_from_lst(
lsts[lst_idx] * np.pi / 12, ra, dec, location.lat.rad
)
az *= 180 / np.pi
el *= 180 / np.pi
# Number of pixels over FULL SKY (4pi) in the sky model
n_pix_tot = len(el)
# Selecting coordinates above the horizon
horizon_mask = el > 0
az_above_horizon = az[horizon_mask]
el_above_horizon = el[horizon_mask]
# Loop over frequency
# Using np.roll means we start at the reference frequency and loop around.
# Starting at ref freq (if there is one) means that we can use the
# reference beam with other frequencies.
for freq_idx in tqdm(
np.roll(range(len(beam.frequency)), -ref_freq_idx),
unit="Frequency",
disable=not freq_progress,
):
if freq_idx not in interpolators:
interpolators[freq_idx] = beam.angular_interpolator(
freq_idx, interp_kind=interp_kind
)
sky_map = sky_model.at_freq(
beam.frequency[freq_idx].to_value("MHz"),
index_model=index_model,
)
sky_map[~horizon_mask] = np.nan
beam_above_horizon *= np.nan
try:
beam_above_horizon[horizon_mask] = interpolators[freq_idx](
az_above_horizon, el_above_horizon
)
except ValueError as e:
raise ValueError(
f"az min/max: {np.min(az_above_horizon), np.max(az_above_horizon)}."
f" el min/max: {np.min(el_above_horizon), np.max(el_above_horizon)}"
) from e
# Weight the beam by the pixel resolution of the sky model.
beam_above_horizon *= sky_model.pixel_res
# Number of pixels above the horizon that are used.
n_pix_ok = np.sum(~np.isnan(beam_above_horizon))
# Number of pixels in the whole sky, but not counting pixels that are
# above horizon and nan.
n_pix_tot_no_nan = n_pix_tot - (len(el_above_horizon) - n_pix_ok)
if normalize_beam:
solid_angle = np.nansum(beam_above_horizon) / n_pix_tot_no_nan
beam_above_horizon *= ground_gain[freq_idx] / solid_angle
antenna_temperature_above_horizon = beam_above_horizon * sky_map
yield (
lst_idx,
freq_idx,
np.nansum(antenna_temperature_above_horizon) / n_pix_tot_no_nan,
antenna_temperature_above_horizon,
sky_map,
beam_above_horizon,
time,
n_pix_tot_no_nan,
az,
el,
interpolators[freq_idx],
)
[docs]
def simulate_spectra(
beam: Beam,
sky_model: sky_models.SkyModel,
ground_loss: np.ndarray | None = None,
f_low: float | None = 0,
f_high: float | None = np.inf,
normalize_beam: bool = True,
index_model: sky_models.IndexModel = sky_models.ConstantIndex(),
lsts: np.ndarray = None,
beam_smoothing: bool = True,
smoothing_model: mdl.Model = mdl.Polynomial(n_terms=12),
interp_kind: Literal[
"linear",
"nearest",
"slinear",
"cubic",
"quintic",
"pchip",
"spline",
"sphere-spline",
] = "sphere-spline",
use_astropy_azel: bool = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Simulate global spectra from sky and beam models.
Parameters
----------
band
The band of the antenna (low, mid, high).
beam
A :class:`Beam` object.
ground_loss
An array of ground-loss values for the beam, shape (Nfreq,).
f_low
Minimum frequency to keep in the simulation (frequencies otherwise defined by
the beam).
f_high
Maximum frequency to keep in the simulation (frequencies otherwise defined by
the beam).
normalize_beam
Whether to normalize the beam to be maximum unity.
sky_model
A sky model to use.
index_model
An :class:`IndexModel` to use to generate different frequencies of the sky
model.
lsts
The LSTs at which to simulate
Returns
-------
antenna_temperature_above_horizon
The antenna temperature for pixels above the horizon, shape (Nlst, Nfreq)
freq
The frequencies at which the simulation is defined.
lst
The LSTs at which the sim is defined.
"""
beam = beam.between_freqs(f_low, f_high)
if lsts is None:
lsts = np.arange(0, 24, 0.5)
antenna_temperature_above_horizon = np.zeros((len(lsts), len(beam.frequency)))
for i, j, temperature, _, _, _, _, _, _, _, _ in sky_convolution_generator(
lsts=lsts,
ground_loss=ground_loss,
beam=beam,
sky_model=sky_model,
index_model=index_model,
normalize_beam=normalize_beam,
beam_smoothing=beam_smoothing,
smoothing_model=smoothing_model,
interp_kind=interp_kind,
use_astropy_azel=use_astropy_azel,
):
antenna_temperature_above_horizon[i, j] = temperature
return antenna_temperature_above_horizon, beam.frequency, lsts