Source code for edges.sim.antenna_beam_factor

"""Functions for computing the antenna beam factor."""

from typing import Literal, Self

import astropy.coordinates as apc
import attrs
import numpy as np
import scipy.interpolate as spi
from astropy import units as u

from edges import const
from edges import modeling as mdl
from edges.io.serialization import hickleable
from edges.sim import sky_models
from edges.sim.beams import Beam

from .. import types as tp
from .simulate import sky_convolution_generator


[docs] @hickleable @attrs.define(slots=False) class BeamFactor: """A non-interpolated beam factor. This class holds the attributes necessary to compute beam factors at particular LSTs and frequencies, namely the antenna temperature (beam-weighted integral of the sky) and the same at a particular reference frequency. We hold these separately to enable computing the beam factor in different ways from these basic quantities. Attributes ---------- frequencies: np.ndarray The frequencies at which the beam-weighted sky integrals are defined. lsts: np.ndarray The LSTs at which the beam-weighted sky integrals are defined. reference_frequency: float The reference frequency. antenna_temp: np.ndarray The beam-weighted sky integrals at each frequency and LST. antenna_temp_ref: np.ndarray The beam-weighted sky integrals at the reference frequency and each LST. loss_fraction: np.ndarray The fraction of the sky signal lost below the horizon. meta A dictionary of metadata. """ frequencies: np.ndarray = attrs.field(converter=np.asarray) lsts: np.ndarray = attrs.field(converter=np.asarray) reference_frequency: float = attrs.field( converter=float, ) antenna_temp: np.ndarray = attrs.field(converter=np.asarray) antenna_temp_ref: np.ndarray = attrs.field(converter=np.asarray) loss_fraction: np.ndarray | None = attrs.field(default=None) meta: dict = attrs.field(factory=dict, converter=dict) @property def nfreq(self) -> int: """The number of frequencies in the beam factor.""" return len(self.frequencies) @property def nlst(self) -> int: """The number of LSTs in the beam factor.""" return len(self.lsts) @frequencies.validator def _check_frequencies(self, attribute: attrs.Attribute, value: np.ndarray) -> None: if not np.all(np.diff(value) > 0): raise ValueError("Frequencies must be monotonically increasing.") @lsts.validator def _check_lsts(self, attribute: attrs.Attribute, value: np.ndarray) -> None: # LSTs can wrap around 24 hours, but only once. if np.sum(np.diff(value) < 0) > 1: raise ValueError("LSTs must be monotonically increasing.") @reference_frequency.validator def _check_reference_frequency( self, attribute: attrs.Attribute, value: float ) -> None: if value <= 0: raise ValueError("Reference frequency must be positive.") @antenna_temp.validator @loss_fraction.validator def _check_antenna_temp( self, attribute: attrs.Attribute, value: np.ndarray ) -> None: if attribute.name == "loss_fraction" and value is None: return if value.ndim != 2: raise ValueError(f"{attribute.name} must be a 2D array.") if value.shape != (self.nlst, self.nfreq): raise ValueError(f"{attribute.name} must have shape (nlst, nfreq).") @antenna_temp.validator def _check_positive(self, attribute: str, value: np.ndarray) -> None: if np.any(value < 0): raise ValueError( f"Antenna temperature must be positive, got min of {np.nanmin(value)}" ) @antenna_temp_ref.validator def _check_antenna_temp_ref(self, attribute: str, value: np.ndarray) -> None: if value.ndim not in (1, 2): raise ValueError("Reference antenna temperature must be a 1D or 2D array.") if value.ndim == 1 and value.shape != (self.nlst,): raise ValueError( "If Reference antenna temperature is 1D, it must have shape (nlst,)." f"Got shape {value.shape} instead of {(self.nlst,)}" ) if value.ndim == 2 and value.shape != (self.nlst, self.nfreq): raise ValueError( "If Reference antenna temperature is 2D, it must have shape " "(nlst, nfreq)." ) if np.any(value < 0): raise ValueError( "Reference antenna temperature must be positive, " f"got min of {np.nanmin(value)}" )
[docs] def at_lsts(self, lsts: np.ndarray, interp_kind: int | str = "cubic") -> Self: """Return a new BeamFactor at the given LSTs.""" d = attrs.asdict(self) lst_like = [ k for k, v in d.items() if isinstance(v, np.ndarray) and v.shape[0] == self.nlst if k != "lsts" ] these_lsts = self.lsts % 24 while np.any(these_lsts < these_lsts[0]): these_lsts[these_lsts < these_lsts[0]] += 24 use_lsts = lsts % 24 use_lsts[use_lsts < these_lsts[0]] += 24 these_lsts = np.append(these_lsts, these_lsts[0] + 24) out = {} for k in lst_like: if d[k].ndim == 2: val = np.vstack((d[k], d[k][0])) elif d[k].ndim == 1: val = np.concatenate((d[k], [d[k][0]])) out[k] = spi.interp1d(these_lsts, val, axis=0, kind=interp_kind)(use_lsts) return attrs.evolve(self, lsts=lsts, **out)
[docs] def between_lsts(self, lst0: float, lst1: float) -> Self: """Return a new BeamFactor including only LSTs between those given. Parameters ---------- lst0 Lower edge of lsts in hours. lst1 Upper edge of lsts in hours. """ if lst1 < lst0: lst1 += 24 these_lsts = self.lsts % 24 these_lsts[these_lsts < lst0] += 24 mask = np.logical_and(these_lsts >= lst0, these_lsts < lst1) if not np.any(mask): raise ValueError( f"BeamFactor does not contain any LSTs between {lst0} and {lst1}." ) d = attrs.asdict(self) lst_like = [ k for k, v in d.items() if isinstance(v, np.ndarray) and v.shape[0] == self.nlst # and v.ndim == 2 if k != "lsts" ] out = {k: getattr(self, k)[mask] for k in lst_like} return attrs.evolve(self, lsts=these_lsts[mask], **out)
[docs] def get_beam_factor( self, model: mdl.Model, freqs: np.ndarray | None = None ) -> np.ndarray: """Return the beam factor as a function of LST and frequency. This will always be normalized to unity at the reference frequency, via a model fit. """ if freqs is None: freqs = self.frequencies bf = (self.antenna_temp.T / self.antenna_temp_ref.T).T fixed_model = model.at(x=self.frequencies) ref_bf = np.zeros(self.nlst) out = np.zeros((self.nlst, len(freqs))) for i, ibf in enumerate(bf): fit = fixed_model.fit(ibf) ref_bf = fit.evaluate(self.reference_frequency) out[i] = fit.evaluate(freqs) / ref_bf return out
[docs] def get_mean_beam_factor( self, model: mdl.Model, freqs: np.ndarray | None ) -> np.ndarray: """Return the mean beam factor over all LSTs.""" return np.mean(self.get_beam_factor(model, freqs), axis=0)
[docs] def get_integrated_beam_factor( self, model: mdl.Model, freqs: np.ndarray | None = None, **fit_kwargs ) -> np.ndarray: """Return the beam factor integrated over the LST range. This is the ratio of summed LSTs, rather than the sum of the ratio at each LST, i.e. it is not the same as the mean beam factor over the LST range. It is normalized to unity at the reference frequency via a model fit. """ if freqs is None: freqs = self.frequencies bf = np.sum(self.antenna_temp, axis=0) / np.sum(self.antenna_temp_ref, axis=0) fit = model.fit(self.frequencies, bf, **fit_kwargs) return fit.evaluate(freqs) / fit.evaluate(self.reference_frequency)
[docs] def compute_antenna_beam_factor( beam: Beam, sky_model: sky_models.SkyModel, ground_loss: np.ndarray | None = None, f_low: tp.FreqType = 0 * u.MHz, f_high: tp.FreqType = np.inf * u.MHz, normalize_beam: bool = True, index_model: sky_models.IndexModel = sky_models.GaussianIndex(), lsts: np.ndarray | None = None, reference_frequency: tp.FreqType | None = None, beam_smoothing: bool = True, smoothing_model: mdl.Model = mdl.Polynomial(n_terms=12), interp_kind: Literal[ "linear", "nearest", "slinear", "cubic", "quintic", "pchip", "spline", "sphere-spline", ] = "cubic", lst_progress: bool = True, freq_progress: bool = True, location: apc.EarthLocation = const.edges_location, sky_at_reference_frequency: bool = True, use_astropy_azel: bool = True, ) -> BeamFactor: """ Calculate the antenna beam factor. Parameters ---------- beam A :class:`Beam` object. ground_loss An array of ground-loss values for the beam, shape (Nfreq,). f_low Minimum frequency to keep in the simulation (frequencies otherwise defined by the beam). f_high Maximum frequency to keep in the simulation (frequencies otherwise defined by the beam). normalize_beam Whether to normalize the beam to be maximum unity. sky_model A sky model to use. index_model An :class:`IndexModel` to use to generate different frequencies of the sky model. twenty_min_per_lst How many periods of twenty minutes fit into each LST bin. save_dir The directory in which to save the output beam factor. save_fname The filename to save the output beam factor. reference_frequency The frequency to take as the "reference", i.e. where the chromaticity will be by construction unity. lst_progress Whether to show a progress bar over the LSTs. freq_progress Whether to show a progress bar over the frequencies. location The location of the telescope. Returns ------- beam_factor : :class`BeamFactor` instance """ beam = beam.between_freqs(f_low, f_high) if lsts is None: lsts = np.arange(0, 24, 0.5) # Get index of reference frequency if reference_frequency is None: reference_frequency = (f_high + f_low) / 2 indx_ref_freq = np.argmin(np.abs(beam.frequency - reference_frequency)) # Don't reset the reference frequency. Alan uses the discrete ref frequency # to get the weighted sky temp, then models that, and divides by the model at # non-discrete ref frequency to normalize. antenna_temperature_above_horizon = np.zeros((len(lsts), len(beam.frequency))) if sky_at_reference_frequency: convolution_ref = np.zeros((len(lsts),)) else: convolution_ref = np.zeros((len(lsts), len(beam.frequency))) loss_fraction = np.zeros((len(lsts), len(beam.frequency))) beamsums = np.zeros((len(lsts), len(beam.frequency))) for ( lst_idx, freq_idx, temperature, _, sky, bm, _, npix_no_nan, _az, _el, _interp, ) in sky_convolution_generator( lsts, beam=beam, sky_model=sky_model, index_model=index_model, normalize_beam=normalize_beam, beam_smoothing=beam_smoothing, smoothing_model=smoothing_model, ground_loss=ground_loss, interp_kind=interp_kind, lst_progress=lst_progress, freq_progress=freq_progress, location=location, ref_freq_idx=indx_ref_freq, use_astropy_azel=use_astropy_azel, ): # 'temperature' is the mean beam-weighted foreground above the horizon (single # float for this lst, freq). If normalize_beam is True, it's # normalized by the integral of the beam (at each freq) # 'sky' is the full sky model for this LST and freq # 'bm' is the full beam model for this freq (same for all LSTs) antenna_temperature_above_horizon[lst_idx, freq_idx] = temperature if freq_idx == indx_ref_freq: ref_bm = bm.copy() ref_sky = sky.copy() # This updates once per LST, on the first frequency iteration """ sky_at_reference_frequency is a toggle between Eq-4 and Eq-A1 from Sims+23 """ if sky_at_reference_frequency: convolution_ref[lst_idx] = np.nansum(ref_bm * ref_sky) / npix_no_nan if not sky_at_reference_frequency: convolution_ref[lst_idx, freq_idx] = np.nansum(ref_bm * sky) / npix_no_nan beamsums[lst_idx, freq_idx] = np.nansum(bm) / npix_no_nan # Loss fraction loss_fraction[lst_idx, freq_idx] = 1 - np.nansum(bm) / npix_no_nan return BeamFactor( frequencies=beam.frequency.to_value("MHz").astype(float), lsts=np.array(lsts).astype(float), antenna_temp=( antenna_temperature_above_horizon if normalize_beam else antenna_temperature_above_horizon / beamsums ), antenna_temp_ref=( convolution_ref if normalize_beam else (convolution_ref.T / beamsums[:, indx_ref_freq]).T ), loss_fraction=loss_fraction, reference_frequency=reference_frequency.to_value("MHz"), meta={ "beam_file": str(beam.raw_file), "simulator": beam.simulator, "f_low": f_low.to_value("MHz"), "f_high": f_high.to_value("MHz"), "normalize_beam": bool(normalize_beam), "sky_model": sky_model.name, "index_model": str(index_model), "rotation_from_north": float(90), }, )