Source code for edges_analysis.beams

"""Beam models and chromaticity corrections."""

from __future__ import annotations

import astropy.coordinates as apc
import astropy.time as apt
import attrs
import logging
import numpy as np
import scipy.interpolate as spi
from astropy import units as u
from edges_cal import FrequencyRange
from edges_cal import modelling as mdl
from edges_cal import types as tp
from edges_cal.tools import vld_unit
from hickleable import hickleable
from pathlib import Path
from tqdm import tqdm
from typing import Literal

from . import _coordinates_alan as crda
from . import const
from . import coordinates as coords
from . import sky_models
from .calibration.loss import ground_loss
from .config import config
from .data import BEAM_PATH

logger = logging.getLogger(__name__)

# Reference UTC observation time. At this time, the LST is 0.1666 (00:10 Hrs LST) at the
# EDGES location. NOTE: this is used by default, but can be changed by the user anywhere
# it is used.
REFERENCE_TIME = apt.Time("2014-01-01T09:39:42", location=const.edges_location)


[docs] @attrs.define(kw_only=True) class Beam: beam: np.ndarray = attrs.field() frequency: tp.FreqType = attrs.field(validator=vld_unit("frequency")) elevation: np.ndarray = attrs.field(converter=np.asarray) azimuth: np.ndarray = attrs.field(converter=np.asarray) simulator: str | None = attrs.field(default=None, converter=str) instrument: str | None = attrs.field(default=None, converter=str) raw_file: str | None = attrs.field(default=None, converter=str)
[docs] @classmethod def from_hfss( cls, path: tp.PathLike, frequency: tp.FreqType, linear: bool = True, theta_min: float = 0, theta_max: float = 180, theta_resolution: float = 1, phi_min: float = 0, phi_max: float = 359, phi_resolution: float = 1, ) -> Beam: """ Create a Beam object from a HFSS file. Parameters ---------- path Path to the file. Use :meth:`resolve_file` to get the absolute path from a a relative path (starting with ':'). linear Whether the beam values are in linear units (or decibels) theta_min, theta_max Min/Max of the zenith angle (degrees) theta_resolution Resolution of the zenith angle. phi_min, phi_max Min/Max of the azimuth angles (degrees) phi_resolution The resolution of the azimuth angle. Returns ------- beam The beam object. """ d = np.genfromtxt(path, skip_header=1, delimiter=",") theta = np.arange(theta_min, theta_max + theta_resolution, theta_resolution) phi = np.arange(phi_min, phi_max + phi_resolution, phi_resolution) beam_map = np.zeros((len(theta), len(phi))) for i in range(len(theta)): mask = d[:, 1] == theta[i] this_phi = d[mask, 0] this_power = d[mask, 2] this_phi[this_phi < 0] += 360 this_phi, unique_inds = np.unique(this_phi, axis=0, return_index=True) this_power = this_power[unique_inds] this_power = this_power[np.argsort(this_phi)] if not linear: this_power = 10 ** (this_power / 10) this_power[np.isnan(this_power)] = 0 beam_map[i, :] = this_power return Beam( frequency=np.array([frequency]), elevation=theta, azimuth=phi, beam=beam_map, simulator="hfss", raw_file=path, )
[docs] @classmethod def from_wipld(cls, path: tp.PathLike, az_antenna_axis: float = 0) -> Beam: """Read a WIPL-D beam.""" with open(path) as fn: file_length = 0 number_of_frequencies = 0 flag_columns = False frequencies_list = [] for line in fn: file_length += 1 if line[2] == ">": number_of_frequencies += 1 frequencies_list.append(float(line[19:32])) elif not flag_columns: line_splitted = line.split() number_of_columns = len(line_splitted) flag_columns = True rows_per_frequency = ( file_length - number_of_frequencies ) / number_of_frequencies output = np.zeros( ( int(number_of_frequencies), int(rows_per_frequency), int(number_of_columns), ) ) frequencies = np.array(frequencies_list) with open(path) as fn: i = -1 for line in fn: if line[2] == ">": i += 1 j = -1 else: j += 1 line_splitted = line.split() line_array = np.array(line_splitted, dtype=float) output[i, j, :] = line_array # Re-arrange data phi_u = np.unique(output[0, :, 0]) theta_u = np.unique(output[0, :, 1]) beam = np.zeros((len(frequencies), len(theta_u), len(phi_u))) for i in range(len(frequencies)): out_2D = output[i, :, :] phi = out_2D[:, 0] theta = ( 90 - out_2D[:, 1] ) # theta is zero at the zenith, and goes to 180 deg gain = out_2D[:, 6] theta_u = np.unique(theta) it = np.argsort(theta_u) theta_a = theta_u[it] for j in range(len(theta_a)): phi_j = phi[theta == theta_a[j]] gain_j = gain[theta == theta_a[j]] ip = np.argsort(phi_j) gp = gain_j[ip] beam[i, j, :] = gp # Flip beam from theta to elevation beam_maps = beam[:, ::-1, :] # Change coordinates from theta/phi, to AZ/EL el = np.arange(0, 91) az = np.arange(0, 360) # Shifting beam relative to true AZ (referenced at due North) beam_maps_shifted = cls.shift_beam_maps(az_antenna_axis, beam_maps) return Beam( frequency=frequencies, azimuth=az, elevation=el, beam=beam_maps_shifted, simulator="wipl-d", raw_file=path, )
[docs] @classmethod def from_ideal(cls, delta_f=2, f_low=40, f_high=200, delta_az=1, delta_el=1): """Create an ideal beam that is completely unity.""" freq = np.arange(f_low, f_high, delta_f) az = np.arange(0, 360, delta_az) el = np.arange(0, 90 + 0.1 * delta_el, delta_el) return Beam( frequency=freq * u.MHz, azimuth=az, elevation=el, beam=np.ones((len(freq), len(el), len(az))), simulator="ideal", )
[docs] @classmethod def from_feko(cls, path: str | Path, az_antenna_axis: float = 0) -> Beam: """ Read a FEKO beam file. Parameters ---------- filename The path to the file. az_antenna_axis The azimuth of the primary antenna axis, in degrees. Returns ------- beam_maps A ``(Nfreq, Nel, Naz)`` array giving values of the beam. Note that elevation and azimuth are always in 1-degree increments. freq The frequencies at which the beam is defined. """ filename = Path(path) data = np.genfromtxt(str(filename)) frequency = [] with open(filename) as fl: for line in fl: if line.startswith("#FREQUENCY"): line = line.split("MHz")[0].strip().split(" ") frequency.append(float(line[-1])) freq = FrequencyRange(np.array(frequency) * u.MHz) # Loading data and convert to linear representation beam_maps = np.zeros((len(frequency), 91, 360)) for i in range(len(frequency)): beam_maps[i] = (10 ** (data[(i * 360) : ((i + 1) * 360), 2::] / 10)).T # Shifting beam relative to true AZ (referenced at due North) # Due to angle of orientation of excited antenna panels relative to due North beam_maps = cls.shift_beam_maps(az_antenna_axis, beam_maps) return Beam( frequency=freq.freq, beam=beam_maps, azimuth=np.arange(0, 360), elevation=np.arange(0, 91), simulator="feko", raw_file=path, )
[docs] @classmethod def from_cst( cls, file_name_prefix: tp.PathLike, f_low: int = 40, f_high: int = 100, freq_p: int = 61, theta_p: float = 181, phi_p: float = 361, az_antenna_axis: float = 0, ) -> Beam: """ Read a CST beam file. Parameters ---------- filename The path to the file. az_antenna_axis The azimuth of the primary antenna axis, in degrees. f_low, f_high lower and higher frequency bounds freq_p Number of frequency points in the simulation file. theta_p Number of zenith angle points in the simulation file. phi_p Number of azimuth points in the simulation file. Returns ------- beam The beam object. """ phi_t = np.zeros((freq_p, theta_p * phi_p)) frequency = np.linspace(f_low, f_high, freq_p) beam_square = np.zeros((freq_p, theta_p, phi_p)) res = (f_high - f_low) / (freq_p - 1) for i in range(freq_p): n = int(i * res + f_low) with open(f"{file_name_prefix}farfield (f={n}) [1].txt", "rb") as fc_file: for x, line in enumerate(fc_file): if x > 1: check = 0 for o in range(len(line)): if line[o] != "": check = check + 1 if check == 3: phi_t[i][x - 2] = float(line.split()[2]) for i in range(freq_p): for x in range(phi_p): beam_square[i, :, x] = phi_t[i, x * theta_p : (x + 1) * theta_p] beam_square[i, :, 0 : int(phi_p / 2)] = beam_square[ i, :, int(phi_p / 2) : phi_p - 1 ] freq = FrequencyRange(np.array(frequency) * u.MHz) beam_square[:, 91:, :] = 0 beam_maps = np.flip(beam_square[:, :91, :360], axis=1) # Shifting beam relative to true AZ (referenced at due North) # Due to angle of orientation of excited antenna panels relative to due North beam_maps = cls.shift_beam_maps(az_antenna_axis, beam_maps) return Beam( frequency=freq.freq, beam=beam_maps, azimuth=np.arange(0, 360), elevation=np.arange(0, 91), simulator="cst", raw_file=file_name_prefix, )
[docs] @classmethod def from_feko_raw( cls, file_name_prefix: tp.PathLike, ext: str = "txt", f_low: int = 40, f_high: int = 100, freq_p: int = 61, theta_p: float = 181, phi_p: float = 361, az_antenna_axis: float = 0, ) -> Beam: """ Read a FEKO beam file. Parameters ---------- filename The path to the file. az_antenna_axis The azimuth of the primary antenna axis, in degrees. f_low, f_high lower and higher frequency bounds freq_p Number of frequency points in the simulation file. theta_p Number of zenith angle points in the simulation file. phi_p Number of azimuth points in the simulation file. Returns ------- beam The beam object. """ beam_square = np.zeros((freq_p, theta_p, phi_p)) frequency = np.linspace(f_low, f_high, freq_p) f1 = open(f"{file_name_prefix}_0-90.{ext}") f2 = open(f"{file_name_prefix}_91-180.{ext}") f3 = open(f"{file_name_prefix}_181-270.{ext}") f4 = open(f"{file_name_prefix}_271-360.{ext}") z = ( theta_p * 91 + 10 ) # ---> change this to no.of theta * no.of phi + No.of header lines for index, line in enumerate(f1): if index % z == 0: co = 0 if index % z >= 10: x = list(map(float, line.split())) beam_square[int(index / z), co % theta_p, int(co / theta_p)] = 10 ** ( x[8] / 10 ) co += 1 z = theta_p * 90 + 10 # for index, line in enumerate(f2): if index % z == 0: co = 0 if index % z >= 10: x = list(map(float, line.split())) beam_square[int(index / z), co % theta_p, int(co / theta_p) + 91] = ( 10 ** (x[8] / 10) ) co += 1 for index, line in enumerate(f3): if index % z == 0: co = 0 if index % z >= 10: x = list(map(float, line.split())) beam_square[int(index / z), co % theta_p, int(co / theta_p) + 181] = ( 10 ** (x[8] / 10) ) co += 1 for index, line in enumerate(f4): if index % z == 0: co = 0 if index % z >= 10: x = list(map(float, line.split())) beam_square[int(index / z), co % theta_p, int(co / theta_p) + 271] = ( 10 ** (x[8] / 10) ) co += 1 freq = FrequencyRange(np.array(frequency) * u.MHz) beam_square[:, 90:, :] = 0 beam_maps = np.flip(beam_square[:, :91, :360], axis=1) # Shifting beam relative to true AZ (referenced at due North) # Due to angle of orientation of excited antenna panels relative to due North beam_maps = cls.shift_beam_maps(az_antenna_axis, beam_maps) return Beam( frequency=freq.freq, beam=beam_maps, azimuth=np.arange(0, 360), elevation=np.arange(0, 91), simulator="feko", raw_file=file_name_prefix, )
[docs] @classmethod def get_beam_path(cls, band: str, kind: str | None = None) -> Path: """Get a standard path to a beam file.""" pth = BEAM_PATH / band / "default.txt" if not kind else kind + ".txt" if not pth.exists(): raise FileNotFoundError(f"No beam exists for band={band}.") return pth
[docs] def select_freqs(self, indx: tp.Sequence[int]) -> Beam: """Select a subset of frequencies.""" if not hasattr(indx, "__len__"): indx = [indx] return attrs.evolve( self, frequency=self.frequency[indx], beam=self.beam[indx], )
[docs] def at_freq( self, freq: tp.FreqType, model: mdl.Model = mdl.Polynomial( n_terms=13, transform=mdl.ScaleTransform(scale=75.0) ), **fit_kwargs, ) -> Beam: """ Interpolate the beam to a new set of frequencies. Parameters ---------- freq Frequencies to interpolate to. Returns ------- beam The Beam object at the new frequencies. """ if len(self.frequency) < 3: raise ValueError( "Can't freq-interpolate beam that has fewer than three frequencies." ) # Frequency interpolation interp_beam = np.zeros((len(freq), len(self.elevation), len(self.azimuth))) cached_model = model.at(x=self.frequency.to_value("MHz")) for i, bm in enumerate(self.beam.T): for j, b in enumerate(bm): model_fit = cached_model.fit(ydata=b, **fit_kwargs) interp_beam[:, j, i] = model_fit.evaluate(freq.to_value("MHz")) return Beam( frequency=freq, azimuth=self.azimuth, elevation=self.elevation, beam=interp_beam, )
[docs] def smoothed( self, model: mdl.Model = mdl.Polynomial(n_terms=12), **fit_kwargs ) -> Beam: """ Smoothes the beam within its same set of frequencies. ---------- Returns ------- beam The Beam object smoothed over its same frequencies. """ if len(self.frequency) < 3: raise ValueError( "Can't freq-interpolate beam that has fewer than three frequencies." ) # Frequency smoothing smooth_beam = np.zeros_like(self.beam) cached_model = model.at(x=self.frequency.to_value("MHz")) for i, bm in enumerate(self.beam.T): for j, b in enumerate(bm): model_fit = cached_model.fit(ydata=b, **fit_kwargs) smooth_beam[:, j, i] = model_fit.evaluate() return Beam( frequency=self.frequency, azimuth=self.azimuth, elevation=self.elevation, beam=smooth_beam, )
[docs] @staticmethod def shift_beam_maps(az_antenna_axis: float, beam_maps: np.ndarray): """Rotate beam maps around an axis. Parameters ---------- az_antenna_axis The aximuth angle of the antenna axis. beam_maps Beam maps as a function of frequency, za and az. Returns ------- beam maps Array of the same shape as the input, but rotated. """ if az_antenna_axis < 0: index = -az_antenna_axis bm1 = beam_maps[:, :, index::] bm2 = beam_maps[:, :, 0:index] return np.append(bm1, bm2, axis=2) elif az_antenna_axis > 0: index = az_antenna_axis bm1 = beam_maps[:, :, 0:(-index)] bm2 = beam_maps[:, :, (360 - index) : :] return np.append(bm2, bm1, axis=2) else: return beam_maps
[docs] @classmethod def resolve_file( cls, path: tp.PathLike, band: str | None = None, configuration: str = "default", simulator: str = "feko", ) -> Path: """Resolve a file path to a standard location.""" if str(path) == ":": if band is None: raise ValueError("band must be given if path starts with a colon (:)") # Get the default beam file. return BEAM_PATH / band / f"{configuration}.txt" elif str(path).startswith(":"): if band is None: raise ValueError("band must be given if path starts with a colon (:)") # Use a beam file in the standard directory. return ( Path(config["paths"]["beams"]).expanduser() / f"{band}/simulations/{simulator}/{str(path)[1:]}" ).absolute() else: return Path(path).absolute()
[docs] @classmethod def from_file( cls, band: str | None, simulator: str = "feko", beam_file: tp.PathLike = ":", configuration: str = "default", rotation_from_north: float = 90, ) -> Beam: """Read a beam from file.""" beam_file = cls.resolve_file(beam_file, band, configuration, simulator) if simulator == "feko": # rotation_from_north -= 90 out = cls.from_feko(beam_file, az_antenna_axis=rotation_from_north) elif simulator == "wipl-d": # Beams from WIPL-D out = cls.from_wipld(beam_file, rotation_from_north) elif simulator == "hfss": out = cls.from_hfss(beam_file) else: raise ValueError(f"Unknown value for simulator: '{simulator}'") out.instrument = band return out
# @lru_cache(maxsize=1000)
[docs] def angular_interpolator( self, freq_indx: int, interp_kind: Literal[ "linear", "nearest", "slinear", "quintic", "pchip", "spline", "sphere-spline", ] = "sphere-spline", ) -> callable[[np.ndarray, np.ndarray], np.ndarray]: """Return a callable function that interpolates the beam. The returned function has the signature ``interp(az, el)``, where ``az`` is azimuth in degrees, and ``el`` is elevation in degrees. They may be arrays, in which case they should be the same length. """ beam = self.beam[freq_indx] if interp_kind == "sphere-spline": el = self.elevation * np.pi / 180 + np.pi / 2 if el[-1] > 0.999 * np.pi: el = el[:-1] beam = beam[:-1] spl = spi.RectSphereBivariateSpline( el, self.azimuth * np.pi / 180, beam, pole_values=(None, beam[-1]), pole_exact=True, ) return lambda az, el: spl( el * np.pi / 180 + np.pi / 2, az * np.pi / 180, grid=False ) else: az = np.concatenate([self.azimuth, [self.azimuth[0] + 360]]) beam = np.hstack((beam, beam[:, 0][:, None])) if interp_kind == "spline": spl = spi.RectBivariateSpline( az, self.elevation, beam.T, kx=3, ky=3, s=0 ) return lambda az, el: spl(az, el, grid=False) else: spl = spi.RegularGridInterpolator( (az, self.elevation), beam.T, method=interp_kind, ) return lambda az, el: spl(np.array([az, el]).T)
[docs] def between_freqs( self, low: tp.FreqType = 0 * u.MHz, high: tp.Freqtype = np.inf * u.MHz ) -> Beam: """Return a new :class:`Beam` object restricted to a given frequency range.""" mask = (self.frequency >= low) & (self.frequency <= high) return attrs.evolve(self, frequency=self.frequency[mask], beam=self.beam[mask])
[docs] def get_beam_solid_angle(self) -> float: """Calculate the integrated beam solid angle.""" sin_theta = np.cos(self.elevation * (np.pi / 180)) sin_theta_2D = np.tile(sin_theta, (len(self.azimuth), 1)).T beam_integration = np.sum(self.beam * sin_theta_2D, axis=(1, 2)) d_el = self.elevation[1] - self.elevation[0] d_az = self.azimuth[1] - self.azimuth[0] return d_el * d_az * (np.pi / 180) ** 2 * beam_integration
[docs] @hickleable() @attrs.define(slots=False) class BeamFactor: """A non-interpolated beam factor.""" frequencies: np.ndarray = attrs.field(converter=np.asarray) lsts: np.ndarray = attrs.field(converter=np.asarray) reference_frequency: float = attrs.field( converter=float, ) antenna_temp: np.ndarray = attrs.field(converter=np.asarray) antenna_temp_ref: np.ndarray = attrs.field(converter=np.asarray) loss_fraction: np.ndarray | None = attrs.field(default=None) meta: dict[str, tp.Any] = attrs.field(factory=dict, converter=dict) @property def nfreq(self) -> int: """The number of frequencies in the beam factor.""" return len(self.frequencies) @property def nlst(self) -> int: """The number of LSTs in the beam factor.""" return len(self.lsts) @frequencies.validator def _check_frequencies(self, attribute: attrs.Attribute, value: np.ndarray) -> None: if not np.all(np.diff(value) > 0): raise ValueError("Frequencies must be monotonically increasing.") @lsts.validator def _check_lsts(self, attribute: attrs.Attribute, value: np.ndarray) -> None: # LSTs can wrap around 24 hours, but only once. if np.sum(np.diff(value) < 0) > 1: raise ValueError("LSTs must be monotonically increasing.") @reference_frequency.validator def _check_reference_frequency( self, attribute: attrs.Attribute, value: float ) -> None: if value <= 0: raise ValueError("Reference frequency must be positive.") @antenna_temp.validator @loss_fraction.validator def _check_antenna_temp( self, attribute: attrs.Attribute, value: np.ndarray ) -> None: if attribute.name == "loss_fraction" and value is None: return if value.ndim != 2: raise ValueError(f"{attribute.name} must be a 2D array.") if value.shape != (self.nlst, self.nfreq): raise ValueError(f"{attribute.name} must have shape (nlst, nfreq).") @antenna_temp.validator def _check_positive(self, attribute: str, value: np.ndarray) -> None: if np.any(value < 0): raise ValueError( f"Antenna temperature must be positive, got min of {np.nanmin(value)}" ) @antenna_temp_ref.validator def _check_antenna_temp_ref(self, attribute: str, value: np.ndarray) -> None: if value.ndim not in (1, 2): raise ValueError("Reference antenna temperature must be a 1D or 2D array.") if value.ndim == 1 and value.shape != (self.nlst,): raise ValueError( "If Reference antenna temperature is 1D, it must have shape (nlst,)." ) if value.ndim == 2 and value.shape != (self.nlst, self.nfreq): raise ValueError( "If Reference antenna temperature is 2D, it must have shape " "(nlst, nfreq)." ) if np.any(value < 0): raise ValueError( "Reference antenna temperature must be positive, " f"got min of {np.nanmin(value)}" )
[docs] def at_lsts(self, lsts: np.ndarray, interp_kind: int | str = "cubic") -> BeamFactor: """Return a new BeamFactor at the given LSTs.""" d = attrs.asdict(self) lst_like = [ k for k, v in d.items() if isinstance(v, np.ndarray) and v.shape[0] == self.nlst and v.ndim == 2 ] these_lsts = self.lsts % 24 while np.any(these_lsts < these_lsts[0]): these_lsts[these_lsts < these_lsts[0]] += 24 use_lsts = lsts % 24 use_lsts[use_lsts < these_lsts[0]] += 24 these_lsts = np.append(these_lsts, these_lsts[0] + 24) out = {} for k in lst_like: val = np.vstack((d[k], d[k][0])) out[k] = spi.interp1d(these_lsts, val, axis=0, kind=interp_kind)(use_lsts) return attrs.evolve(self, lsts=lsts, **out)
[docs] def between_lsts(self, lst0: float, lst1: float) -> BeamFactor: """Return a new BeamFactor including only LSTs between those given. Parameters ---------- lst0 Lower edge of lsts in hours. lst1 Upper edge of lsts in hours. """ if lst1 < lst0: lst1 += 24 these_lsts = self.lsts % 24 these_lsts[these_lsts < lst0] += 24 mask = np.logical_and(these_lsts >= lst0, these_lsts < lst1) if not np.any(mask): raise ValueError( f"BeamFactor does not contain any LSTs between {lst0} and {lst1}." ) d = attrs.asdict(self) lst_like = [ k for k, v in d.items() if isinstance(v, np.ndarray) and v.shape[0] == self.nlst and v.ndim == 2 ] out = {k: getattr(self, k)[mask] for k in lst_like} return attrs.evolve(self, lsts=these_lsts[mask], **out)
[docs] def get_beam_factor( self, model: mdl.Model, freqs: np.ndarray | None = None ) -> np.ndarray: """Return the beam factor as a function of LST and frequency. This will always be normalized to unity at the reference frequency, via a model fit. """ if freqs is None: freqs = self.frequencies bf = (self.antenna_temp.T / self.antenna_temp_ref.T).T fixed_model = model.at(x=self.frequencies) ref_bf = np.zeros(self.nlst) out = np.zeros((self.nlst, len(freqs))) for i, ibf in enumerate(bf): fit = fixed_model.fit(ibf) ref_bf = fit.evaluate(self.reference_frequency) out[i] = fit.evaluate(freqs) / ref_bf return out
[docs] def get_mean_beam_factor( self, model: mdl.Model, freqs: np.ndarray | None ) -> np.ndarray: """Return the mean beam factor over all LSTs.""" return np.mean(self.get_beam_factor(model, freqs), axis=0)
[docs] def get_integrated_beam_factor( self, model: mdl.Model, freqs: np.ndarray | None = None ) -> np.ndarray: """Return the beam factor integrated over the LST range. This is the ratio of summed LSTs, rather than the sum of the ratio at each LST, i.e. it is not the same as the mean beam factor over the LST range. It is normalized to unity at the reference frequency via a model fit. """ if freqs is None: freqs = self.frequencies bf = np.sum(self.antenna_temp, axis=0) / np.sum(self.antenna_temp_ref, axis=0) fit = model.fit(self.frequencies, bf) return fit.evaluate(freqs) / fit.evaluate(self.reference_frequency)
[docs] def sky_convolution_generator( lsts: np.ndarray, ground_loss_file: str, beam: Beam, sky_model: sky_models.SkyModel, index_model: sky_models.IndexModel, normalize_beam: bool, beam_smoothing: bool, smoothing_model: mdl.Model, location: apc.EarthLocation = const.edges_location, ref_time: apt.Time = REFERENCE_TIME, interp_kind: Literal[ "linear", "nearest", "slinear", "cubic", "quintic", "pchip", "spline", "sphere-spline", ] = "sphere-spline", lst_progress: bool = True, freq_progress: bool = True, ref_freq_idx: int = 0, use_astropy_azel: bool = True, ): """ Iterate through given LSTs and generate a beam*sky product at each freq and LST. This is a generator, so it will yield a single item at a time (to save on memory). Parameters ---------- lsts The LSTs at which to evaluate the convolution. ground_loss_file A path to a file containing ground loss information. beam The beam to convolve. sky_model The sky model to convolve index_model The spectral index model of the sky model. normalize_beam Whether to ensure the beam is properly normalised. beam_interpolation Whether to smooth over freq axis interp_kind The kind of interpolation to use for the beam. "spline" uses :class:`scipy.interpolate.RectBivariateSpline` and "sphere-spline" uses :class:`scipy.interpolate.RectSphereBivariateSpline`. All other options use :class:`scipy.interpolate.RegularGridInterpolator`. with the given kind as ``method``. use_astropy_azel Whether to use the astropy coordinate system for azimuth and elevation. If False, compute the az/el using Alan's method. Yields ------ i The LST enumerator j The frequency enumerator mean_conv_temp The mean temperature after multiplying by the beam (above the horizon) conv_temp An array containing the temperature after multiuplying by the beam in each pixel above the horizon. sky An array containing the sky temperature in pixel above the horizon. beam An array containing the interpolatedbeam in pixels above the horizon time The local time at each LST. n_pixels The total number of pixels that are not masked. Examples -------- Use this function as follows: >>> for i, j, mean_t, conv_t, sky, bm, time, npix in sky_convolution_generator(): >>> print(conv_t) """ if beam_smoothing: beam = beam.smoothed(smoothing_model) if ground_loss_file is not None: ground_gain = ground_loss( ground_loss_file, band=beam.instrument, freq=beam.frequency.to_value("MHz") ) else: ground_gain = np.ones(beam.frequency.size) # Get the local times corresponding to the given LSTs times = coords.lsts_to_times(lsts, ref_time, location) beam_above_horizon = np.full(sky_model.coords.shape, np.nan) interpolators = {} for lst_idx, time in tqdm(enumerate(times), unit="LST", disable=not lst_progress): # Transform Galactic coordinates of Sky Model to Local coordinates if use_astropy_azel: altaz = sky_model.coords.transform_to( apc.AltAz(location=location, obstime=time) ) az = np.asarray(altaz.az.deg) el = np.asarray(altaz.alt.deg) else: ra, dec = crda.galactic_to_radec( sky_model.coords.galactic.b.deg, sky_model.coords.galactic.l.deg ) az, el = crda.radec_azel_from_lst( lsts[lst_idx] * np.pi / 12, ra, dec, location.lat.rad ) az *= 180 / np.pi el *= 180 / np.pi # Number of pixels over FULL SKY (4pi) in the sky model n_pix_tot = len(el) # Selecting coordinates above the horizon horizon_mask = el > 0 az_above_horizon = az[horizon_mask] el_above_horizon = el[horizon_mask] # Loop over frequency # Using np.roll means we start at the reference frequency and loop around. # Starting at ref freq (if there is one) means that we can use the # reference beam with other frequencies. for freq_idx in tqdm( np.roll(range(len(beam.frequency)), -ref_freq_idx), unit="Frequency", disable=not freq_progress, ): if freq_idx not in interpolators: interpolators[freq_idx] = beam.angular_interpolator( freq_idx, interp_kind=interp_kind ) sky_map = sky_model.at_freq( beam.frequency[freq_idx].to_value("MHz"), index_model=index_model, ) sky_map[~horizon_mask] = np.nan beam_above_horizon *= np.nan try: beam_above_horizon[horizon_mask] = interpolators[freq_idx]( az_above_horizon, el_above_horizon ) except ValueError as e: raise ValueError( f"az min/max: {np.min(az_above_horizon), np.max(az_above_horizon)}." f" el min/max: {np.min(el_above_horizon), np.max(el_above_horizon)}" ) from e # Weight the beam by the pixel resolution of the sky model. beam_above_horizon *= sky_model.pixel_res # Number of pixels above the horizon that are used. n_pix_ok = np.sum(~np.isnan(beam_above_horizon)) # Number of pixels in the whole sky, but not counting pixels that are # above horizon and nan. n_pix_tot_no_nan = n_pix_tot - (len(el_above_horizon) - n_pix_ok) if normalize_beam: solid_angle = np.nansum(beam_above_horizon) / n_pix_tot_no_nan beam_above_horizon *= ground_gain[freq_idx] / solid_angle antenna_temperature_above_horizon = beam_above_horizon * sky_map yield ( lst_idx, freq_idx, np.nansum(antenna_temperature_above_horizon) / n_pix_tot_no_nan, antenna_temperature_above_horizon, sky_map, beam_above_horizon, time, n_pix_tot_no_nan, az, el, interpolators[freq_idx], )
[docs] def simulate_spectra( beam: Beam, ground_loss_file: [str, Path] = ":", f_low: [None, float] = 0, f_high: [None, float] = np.inf, normalize_beam: bool = True, sky_model: sky_models.SkyModel = sky_models.Haslam408(), index_model: sky_models.IndexModel = sky_models.ConstantIndex(), lsts: np.ndarray = None, beam_smoothing: bool = True, smoothing_model: mdl.Model = mdl.Polynomial(n_terms=12), interp_kind: Literal[ "linear", "nearest", "slinear", "cubic", "quintic", "pchip", "spline", "sphere-spline", ] = "sphere-spline", ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Simulate global spectra from sky and beam models. Parameters ---------- band The band of the antenna (low, mid, high). beam A :class:`Beam` object. ground_loss_file A file pointing to a ground-loss model. By default, gets the default ground loss model for the given ``band``. f_low Minimum frequency to keep in the simulation (frequencies otherwise defined by the beam). f_high Maximum frequency to keep in the simulation (frequencies otherwise defined by the beam). normalize_beam Whether to normalize the beam to be maximum unity. sky_model A sky model to use. index_model An :class:`IndexModel` to use to generate different frequencies of the sky model. lsts The LSTs at which to simulate Returns ------- antenna_temperature_above_horizon The antenna temperature for pixels above the horizon, shape (Nlst, Nfreq) freq The frequencies at which the simulation is defined. lst The LSTs at which the sim is defined. """ beam = beam.between_freqs(f_low, f_high) if lsts is None: lsts = np.arange(0, 24, 0.5) antenna_temperature_above_horizon = np.zeros((len(lsts), len(beam.frequency))) for i, j, temperature, _, _, _, _, _, _, _, _ in sky_convolution_generator( lsts, ground_loss_file, beam, sky_model, index_model, normalize_beam, beam_smoothing, smoothing_model, interp_kind=interp_kind, ): antenna_temperature_above_horizon[i, j] = temperature return antenna_temperature_above_horizon, beam.frequency, lsts
[docs] def antenna_beam_factor( beam: Beam, ground_loss_file: tp.PathLike | None = ":", f_low: tp.FreqType = 0 * u.MHz, f_high: tp.Freqtype = np.inf * u.MHz, normalize_beam: bool = True, sky_model: sky_models.SkyModel = sky_models.Haslam408(), index_model: sky_models.IndexModel = sky_models.GaussianIndex(), lsts: np.ndarray | None = None, reference_frequency: tp.FreqType | None = None, beam_smoothing: bool = True, smoothing_model: mdl.Model = mdl.Polynomial(n_terms=12), interp_kind: Literal[ "linear", "nearest", "slinear", "cubic", "quintic", "pchip", "spline", "sphere-spline", ] = "sphere-spline", lst_progress: bool = True, freq_progress: bool = True, location: apc.EarthLocation = const.edges_location, sky_at_reference_frequency: bool = True, use_astropy_azel: bool = True, ): """ Calculate the antenna beam factor. Parameters ---------- beam A :class:`Beam` object. ground_loss_file A file pointing to a ground-loss model. By default, gets the default ground loss model for the given ``band``. f_low Minimum frequency to keep in the simulation (frequencies otherwise defined by the beam). f_high Maximum frequency to keep in the simulation (frequencies otherwise defined by the beam). normalize_beam Whether to normalize the beam to be maximum unity. sky_model A sky model to use. index_model An :class:`IndexModel` to use to generate different frequencies of the sky model. twenty_min_per_lst How many periods of twenty minutes fit into each LST bin. save_dir The directory in which to save the output beam factor. save_fname The filename to save the output beam factor. reference_frequency The frequency to take as the "reference", i.e. where the chromaticity will be by construction unity. lst_progress Whether to show a progress bar over the LSTs. freq_progress Whether to show a progress bar over the frequencies. location The location of the telescope. Returns ------- beam_factor : :class`BeamFactor` instance """ beam = beam.between_freqs(f_low, f_high) if lsts is None: lsts = np.arange(0, 24, 0.5) # Get index of reference frequency if reference_frequency is None: reference_frequency = (f_high + f_low) / 2 indx_ref_freq = np.argmin(np.abs(beam.frequency - reference_frequency)) # Don't reset the reference frequency. Alan uses the discrete ref frequency # to get the weighted sky temp, then models that, and divides by the model at # non-discrete ref frequency to normalize. # reference_frequency = beam.frequency[indx_ref_freq] antenna_temperature_above_horizon = np.zeros((len(lsts), len(beam.frequency))) if sky_at_reference_frequency: convolution_ref = np.zeros((len(lsts),)) else: convolution_ref = np.zeros((len(lsts), len(beam.frequency))) loss_fraction = np.zeros((len(lsts), len(beam.frequency))) beamsums = np.zeros((len(lsts), len(beam.frequency))) for ( lst_idx, freq_idx, temperature, _, sky, bm, _, npix_no_nan, az, el, interp, ) in sky_convolution_generator( lsts, beam=beam, sky_model=sky_model, index_model=index_model, normalize_beam=normalize_beam, beam_smoothing=beam_smoothing, smoothing_model=smoothing_model, ground_loss_file=ground_loss_file, interp_kind=interp_kind, lst_progress=lst_progress, freq_progress=freq_progress, location=location, ref_freq_idx=indx_ref_freq, use_astropy_azel=use_astropy_azel, ): # 'temperature' is the mean beam-weighted foreground above the horizon (single # float for this lst, freq). If normalize_beam is True, it's # normalized by the integral of the beam (at each freq) # 'sky' is the full sky model for this LST and freq # 'bm' is the full beam model for this freq (same for all LSTs) antenna_temperature_above_horizon[lst_idx, freq_idx] = temperature if freq_idx == indx_ref_freq: ref_bm = bm.copy() # This updates once per LST, on the first frequency iteration if sky_at_reference_frequency: convolution_ref[lst_idx] = np.nansum(ref_bm * sky) / npix_no_nan if not sky_at_reference_frequency: convolution_ref[lst_idx, freq_idx] = np.nansum(ref_bm * sky) / npix_no_nan beamsums[lst_idx, freq_idx] = np.nansum(bm) / npix_no_nan # Loss fraction loss_fraction[lst_idx, freq_idx] = 1 - np.nansum(bm) / npix_no_nan out = BeamFactor( frequencies=beam.frequency.to_value("MHz").astype(float), lsts=np.array(lsts).astype(float), antenna_temp=( antenna_temperature_above_horizon if normalize_beam else antenna_temperature_above_horizon / beamsums ), antenna_temp_ref=( convolution_ref if normalize_beam else (convolution_ref.T / beamsums[:, indx_ref_freq]).T ), loss_fraction=loss_fraction, reference_frequency=reference_frequency.to_value("MHz"), meta={ "beam_file": str(beam.raw_file), "simulator": beam.simulator, "f_low": f_low.to_value("MHz"), "f_high": f_high.to_value("MHz"), "normalize_beam": bool(normalize_beam), "sky_model": sky_model.name, "index_model": str(index_model), "rotation_from_north": float(90), }, ) return out