Source code for edges.analysis.calibrate

"""Module defining calibration routines for field data in EDGES."""

from collections.abc import Callable
from pathlib import Path

import numpy as np
from astropy import units as un
from pygsdata import GSData, gsregister

from .. import modeling as mdl
from .. import types as tp
from ..cal import (
    Calibrator,
)
from ..cal.sparams import ReflectionCoefficient
from ..sim import antenna_beam_factor
from ..sim.antenna_beam_factor import BeamFactor


[docs] @gsregister("calibrate") def apply_noise_wave_calibration( data: GSData, calibrator: Calibrator | tp.PathLike, antenna_s11: ReflectionCoefficient, tload: float | None = None, tns: float | None = None, ) -> GSData: """Apply noise-wave calibration to data. This function requires a :class:`edges.cal.cal_coefficients.Calibrator` object (or a path to a file containing such an object) which must be created beforehand. The antenna S11 used is found automatically by searching for the file that has the closest match to the time of the data. This can be constrained by passing options that match the regex pattern for the S11 files. Parameters ---------- data Data to be calibrated. calobs Calibrator object or path to file containing calibrator object. antenna_s11 """ if data.data_unit not in ("uncalibrated", "uncalibrated_temp"): raise ValueError("Data must be uncalibrated to apply calibration!") if data.data_unit == "uncalibrated_temp" and (tload is None or tns is None): raise ValueError( "You need to supply tload and tns if data_unit is uncalibrated_temp" ) if data.nloads != 1: raise ValueError("Can only apply noise-wave calibration to single load data!") if data.data_unit == "uncalibrated_temp": q = (data.data - tload) / tns else: q = data.data new_data = calibrator.calibrate_q(q, ant_s11=antenna_s11.s11, freqs=data.freqs) if data.model is not None: qmodel = ( (data.model - tload) / tns if data.data_unit == "uncalibrated_temp" else data.model ) resids = new_data - calibrator.calibrate_q( qmodel, ant_s11=antenna_s11.s11, freq=data.freqs ) else: resids = None return data.update( data=new_data.to_value("K"), data_unit="temperature", residuals=resids )
[docs] @gsregister("calibrate") def apply_loss_correction( data: GSData, ambient_temp: tp.TemperatureType, loss: np.ndarray | None = None, loss_function: Callable | None = None, **kwargs, ) -> GSData: """Apply a loss-correction to data. Parameters ---------- data The GSData object on which to apply the loss-correction. ambient_temp The ambient temperature at which to apply the loss-correction. loss An array of losses, where the size of the array must be equal to the number of frequencies in the data. If None, a loss function is used to compute the losses. loss_function A function to compute the loss. The function must accept an array of frequencies as its first argument, and may accept arbitrary other keyword arguments, which will can be passed as kwargs to this function. Either this or loss must be specified. Notes ----- Loss functions can be stacked, either by multiplying the losses before passing them to this function, or by calling this function multiple times, once for each loss. """ if loss is None and loss_function is None: raise ValueError("Either loss or loss_function must be provided!") if loss is None: loss = loss_function(data.freqs, **kwargs) if data.data_unit != "temperature": raise ValueError("Data must be temperature to apply antenna loss correction!") a = ambient_temp.to_value(un.K) spec = (data.data - np.outer(a, (1 - loss))) / loss return data.update( data=spec, data_unit="temperature", residuals=None, )
[docs] @gsregister("calibrate") def apply_beam_factor_directly(data: GSData, beam_file: str | Path) -> GSData: """Apply a beam correction factor from a file directly to the data. This function multiplies the data by the beam correction factor from the provided beamfile. It handles data with a single load and updates the data unit to "temperature". Parameters ---------- data The GSData object containing the data to correct. beamfile The path to the beamfile containing the correction factors. The correction factors should be in the fourth column of the csv file, and should have a size equal to the number of frequencies in the data. Returns ------- data A new GSData object with the corrected data and residuals. Raises ------ NotImplementedError If the data contains more than one load. """ if len(data.loads) > 1: raise NotImplementedError( "Can only apply beam correction to data with a single load" ) new_data = data.data.copy() resids = data.residuals.copy() if data.residuals is not None else None bf = np.loadtxt(beam_file) new_data *= bf[:, 3] if resids is not None: resids *= bf[:, 3] return data.update(data=new_data, residuals=resids, data_unit="temperature")
[docs] @gsregister("calibrate") def apply_beam_correction( data: GSData, beam: str | Path | antenna_beam_factor.BeamFactor, freq_model: mdl.Model, integrate_before_ratio: bool = True, oversample_factor: int = 5, resample_beam_lsts: bool = True, lsts: np.ndarray | None = None, cut_to_data_lsts: bool = True, ) -> GSData: """Apply beam correction to the data. This always applies the beam correction to each time sample in the data. If you want to average the data *before* applying the beam correction, you must average the data before applying this function to it. The input beam factor object should cover the full range of LSTs included in the data itself. The beam factor object is defined at a set of LSTs, and by default, the correction applied to the data is the *average* beam factor in each LST-bin of the data. To use the beam factor *interpolated* to the LSTs of the data instead, set ``interpolate_to_lsts`` to True. There are two ways to define the average beam factor within an LST-bin: either by taking the mean of ratios (of beam-weighted foreground model to *reference* beam-weighted foreground model) or the ratio of means. Switch between these by using the ``integrate_before_ratio`` parameter. Parameters ---------- data Data to be calibrated. beam Either a path to a file containing beam correction coefficients, or the BeamFactor object itself. freq_model The (linear) model to use when evaluating the beam factor at the data freqs. integrate_before_ratio Whether to integrate (over time) the beam-weighted sky temperature and the reference sky temperature individually before taking their ratio to get the beam factor. oversample_factor The number of LST samples to use when interpolating the beam factor to the LSTs of the data. For every data LST, ``oversample_factor`` LSTs will be interpolated to (regularly spaced between each data LST, regardless of whether the data LSTs are regular). This is only used if ``resample_beam_lsts`` is True. resample_beam_lsts Whether to resample LSTs before averaging (by ``oversample_factor``). lsts If given, resample to these LSTs exactly, instead of trying to use the LST ranges in the data with oversample_factor. cut_to_data_lsts If True, cut the LSTs at which the beam is sampled to lie within the LST ranges of the data in each LST bin. Only set this to False if you have a single LST bin in the data and you know precisely the LSTs at which you want to sample the beam. """ if isinstance(beam, str | Path): beam = BeamFactor.from_file(beam) if len(data.loads) > 1: raise NotImplementedError( "Can only apply beam correction to data with a single load" ) if len(beam.lsts) < 4 and resample_beam_lsts: raise ValueError( "Your beam has a single LST so you cannot interpolate over LSTs." ) if resample_beam_lsts: if lsts is not None: beam = beam.at_lsts(lsts) else: cut_to_data_lsts = True new_beam_lsts = [] for lst0, lst1 in data.lst_ranges[:, 0, :]: lst1 = lst1.hour if lst1 < lst0.hour: lst1 = lst1 + 24 new_beam_lsts.append( np.linspace(lst0.hour, lst1, oversample_factor + 1)[:-1] ) new_beam_lsts = np.concatenate(new_beam_lsts) beam = beam.at_lsts(new_beam_lsts) new_data = data.data.copy() resids = data.residuals.copy() if data.residuals is not None else None for i, (lst0, lst1) in enumerate(data.lst_ranges[:, 0, :]): new = beam.between_lsts(lst0.hour, lst1.hour) if cut_to_data_lsts else beam if integrate_before_ratio: bf = new.get_integrated_beam_factor( model=freq_model, freqs=data.freqs.to_value("MHz") ) else: bf = new.get_mean_beam_factor( model=freq_model, freqs=data.freqs.to_value("MHz") ) new_data[:, :, i] /= bf if resids is not None: resids[:, :, i] /= bf return data.update(data=new_data, residuals=resids, data_unit="temperature")