Source code for edges.alanmode.alanmode

"""Functions that run the calibration in a style similar to the C-code."""

import logging
from collections.abc import Callable
from pathlib import Path
from typing import Annotated, Self

import attrs
import numpy as np
from astropy import units as un
from astropy.time import Time
from cyclopts import Parameter
from pygsdata import GSData
from pygsdata.select import select_freqs, select_times
from read_acq.gsdata import read_acq_to_gsdata

from edges.cal import sparams as sp
from edges.frequencies import get_mask
from edges.io.calobsdef3 import CalObsDefEDGES3

from .. import modeling as mdl
from .. import types as tp
from ..averaging.freqbin import gauss_smooth
from ..averaging.lstbin import average_over_times
from ..cal.apply import approximate_temperature
from ..cal.calobs import CalibrationObservation, Calibrator, InputSource
from ..cal.dicke import dicke_calibration
from ..cal.loss import (
    LossFunctionGivenSparams,
    get_cable_loss_model,
    get_loss_model_from_file,
)
from ..cal.receiver_cal import get_noise_wave_calibration_iterative
from ..cal.sparams import (
    CalkitReadings,
    ReflectionCoefficient,
    S11ModelParams,
    SParams,
    correct_receiver_for_extra_cable,
)
from ..cal.spectra import LoadSpectrum
from . import alanio

logger = logging.getLogger(__name__)

SOURCES = ["ambient", "hot_load", "open", "short"]


[docs] def reads1p1( res: float, s11_file_open: str, s11_file_short: str, s11_file_load: str, s11_file_ant: str, loadps: float = 33.0, openps: float = 33.0, shortps: float = 33.0, ) -> sp.ReflectionCoefficient: """Reads the s1p1 file and returns the data.""" standards = CalkitReadings( open=sp.ReflectionCoefficient.from_s1p(s11_file_open), short=sp.ReflectionCoefficient.from_s1p(s11_file_short), match=sp.ReflectionCoefficient.from_s1p(s11_file_load), ) load = sp.ReflectionCoefficient.from_s1p(s11_file_ant) freq = standards.freqs calkit = sp.get_calkit(sp.AGILENT_ALAN, resistance_of_match=res * un.ohm) calkit = calkit.clone( short={"offset_delay": shortps * un.ps}, open={"offset_delay": openps * un.ps}, match={"offset_delay": loadps * un.ps}, ) smatrix = sp.SParams.from_calkit_measurements( model=calkit.at_freqs(freq), measurements=standards ) return load.de_embed(smatrix)
[docs] def corrcsv( s11: sp.ReflectionCoefficient, cablen: float, cabdiel: float, cabloss: float, ): """Corrects the S11 data (LNA) for cable effects. This function is a direct translation of the C-code function corrcsv. Parameters ---------- freq : np.ndarray The frequency array. s11 : np.ndarray The S11 data. cablen : float The cable length, in inches. cabdiel : float The cable dielectric constant, as a percent. cabloss : float The cable loss, as a percent. """ cable_length = (cablen * un.imperial.inch).to("m") return correct_receiver_for_extra_cable( gamma=s11, cable_length=cable_length, cable_loss_percent=cabloss, cable_dielectric_percent=cabdiel, )
[docs] @attrs.define(kw_only=True, frozen=True) class ACQPlot7aMoonParams: """Parameters for the ACQPlot7aMoon spectrum reading and averaging script. Parameters ---------- fstart The starting frequency for the spectrum. fstop The stopping frequency for the spectrum. smooth The number of frequency bins to smooth and downsamples by. tload A guess at the internal load temperature, acting as a starting point for iterative calibration fitting, but also is applied to "kind of" calibrate the data before finding RFI. tcal Similar to ``tload``, but for the internal load + noise source. tstart The starting hour (in gha) for reading spectrum integrations. tstop The ending hour (in gha) for reading spectrum integrations. delaystart The number of seconds to delay before including spectra from the files. Can be useful to ignore beginning of files as the system may still be warming up. """ fstart: float = 50.0 fstop: float = 100.0 smooth: int = 8 tload: float = 300.0 tcal: float = 1000.0 tstart: int = 0 tstop: int = 23 delaystart: int = 0
[docs] @classmethod def bowman_2018_defaults( cls, delaystart=7200, smooth=8, fstart=40.0, fstop=110.0, tload=300.0, tcal=1000.0, tstart=0, tstop=23, ): """Construct a set of parameters with defaults matching Bowman+2018.""" return cls( delaystart=delaystart, smooth=smooth, fstart=fstart, fstop=fstop, tload=tload, tcal=tcal, tstart=tstart, tstop=tstop, )
[docs] def acqplot7amoon( acqfile: str | Path, params: ACQPlot7aMoonParams = ACQPlot7aMoonParams(), **kwargs ) -> GSData: """A function that does what the acqplot7amoon C-code does. Parameters ---------- acqfile The path to the ACQ file to process. params The parameters for the ACQPlot7aMoon function. """ if kwargs: params = ACQPlot7aMoonParams(**kwargs) data = read_acq_to_gsdata(acqfile, telescope="edges-low") if params.tstart > 0 or params.tstop < 23: # Note that tstop=23 includes all possible hours since we have <= hours = np.array([x.hour for x in data.times[:, 0].datetime]) data = select_times( data, indx=(hours >= params.tstart) & (hours <= params.tstop) ) if params.delaystart > 0: secs = (data.times - data.times.min()).sec idx = np.all(secs > params.delaystart, axis=1) data = select_times(data, indx=idx) data = select_freqs( data, freq_range=(params.fstart * un.MHz, params.fstop * un.MHz) ) q = dicke_calibration(data) if params.smooth > 0: q = gauss_smooth(q, size=params.smooth, decimate_at=0) q = average_over_times(q) return approximate_temperature(data=q, tload=params.tload, tns=params.tcal)
[docs] @attrs.define(kw_only=True, frozen=True) class EdgesScriptParams: """Parameters for the edges script. These are parameters for the script traditionally called either edges2k.c or edges3.c. Parameters ---------- Lh The mode in which to calculate the loss function. wfstart The lowest frequency included for fitting the calibration functions. wfstop The highest frequency included for fitting the calibration functions. tcold The "true" temperature of the ambient load. thot The "true" temperature of the hot load. tcab The "true" temperature of the long-cable calibration loads. By default, the same as tcold. cfit The number of polynomial terms to use for fitting the scale and offset calibration temperatures (i.e. Tload and Tlns) wfit The number of polynomial terms to use for fitting the noise-wave calibration temperatures. nfit3 The number of terms used to model the receiver S11. nfit2 The number of terms used to model the load S11s. nter The number of iterations to perform when fitting the noise-wave calibration temperatures. lna_poly Whether the receiver S11 should be modeled as a polynomial. 0 for False, anything else will determine the model based on `nfit3`. If `nfit3` is greater than 16, a Fourier model will be used, otherwise a Polynomial model. """ Lh: Annotated[int, Parameter(name=("Lh",))] = -1 wfstart: float = 50 wfstop: float = 190 tcold: float = 306.5 thot: float = 393.22 tcab: float = attrs.field() cfit: int = 7 wfit: int = 7 nfit3: int = 10 nfit2: int = 27 nter: int = 8 lna_poly: int = -1 @tcab.default def _tcab_default(self) -> float: return self.tcold
[docs] @classmethod def bowman_2018_defaults( cls, cfit=6, wfit=5, Lh=-2, # noqa: N803 wfstart=50.0, wfstop=100.0, tcold=296, thot=399, nfit2=27, nfit3=11, lna_poly=0, **kwargs, ) -> Self: """Construct a set of parameters with defaults matching Bowman+2018.""" return cls( cfit=cfit, wfit=wfit, Lh=Lh, wfstart=wfstart, wfstop=wfstop, tcold=tcold, thot=thot, nfit2=nfit2, nfit3=nfit3, lna_poly=lna_poly, **kwargs, )
def _get_specs( spcold: GSData, sphot: GSData, spopen: GSData, spshort: GSData, params: EdgesScriptParams, tload: tp.TemperatureType, tcal: tp.TemperatureType, ) -> dict[str, LoadSpectrum]: specs = {} for name, spec, temp in zip( SOURCES, [spcold, sphot, spopen, spshort], [params.tcold, params.thot, params.tcab, params.tcab], strict=False, ): specs[name] = LoadSpectrum( q=approximate_temperature( spec, tload=tload.to_value("K"), tns=tcal.to_value("K"), reverse=True ), temp_ave=temp * un.K, ) specs[name] = specs[name].between_freqs( params.wfstart * un.MHz, params.wfstop * un.MHz ) return specs def _get_load_s11s( params: EdgesScriptParams, s11cold, s11hot, s11open, s11short, s11mask: np.ndarray, s11freq: tp.FreqType, spec_freqs: tp.FreqType, ) -> tuple[ dict[str, ReflectionCoefficient], S11ModelParams, dict[str, ReflectionCoefficient] ]: raw_s11s = { name: ReflectionCoefficient( reflection_coefficient=s11[s11mask], freqs=s11freq, ) for name, s11 in zip( SOURCES, [s11cold, s11hot, s11open, s11short], strict=False ) } mdltype = mdl.Fourier if params.nfit2 > 16 else mdl.Polynomial s11_modelling_params = S11ModelParams( model=mdltype( n_terms=params.nfit2, transform=( mdl.ZerotooneTransform(range=(1, 2)) if params.nfit2 > 16 else mdl.Log10Transform(scale=1) ), period=1.5, ), complex_model_type=mdl.ComplexRealImagModel, set_transform_range=True, fit_method="alan-qrd", find_model_delay=True, ) s11_models = { name: s11.smoothed(params=s11_modelling_params, freqs=spec_freqs) for name, s11 in raw_s11s.items() } return raw_s11s, s11_modelling_params, s11_models def _get_receiver_s11(params: EdgesScriptParams, s11lna, s11mask, s11freq, spec_fq): mt = mdl.Fourier if (params.nfit3 > 16 or params.lna_poly == 0) else mdl.Polynomial raw_receiver = ReflectionCoefficient( reflection_coefficient=s11lna[s11mask], freqs=s11freq, ) model_transform = ( mdl.ZerotooneTransform(range=(1, 2)) if mt == mdl.Fourier else mdl.Log10Transform(scale=120) ) model_kwargs = {"period": 1.5} if mt == mdl.Fourier else {} receiver_model = S11ModelParams( model=mt(n_terms=params.nfit3, transform=model_transform, **model_kwargs), complex_model_type=mdl.ComplexRealImagModel, set_transform_range=True, fit_method="alan-qrd", find_model_delay=True, ) receiver = raw_receiver.smoothed(receiver_model, freqs=spec_fq) return raw_receiver, receiver_model, receiver def _get_hotload_loss( params: EdgesScriptParams, s11rig, s12rig, s22rig, s11mask, s11freq, spec_fq, ) -> Callable | None: if params.Lh == -1: hot_loss_model = get_cable_loss_model("UT-141C-SP") elif params.Lh == -2: if s11rig is None or s12rig is None or s22rig is None: raise ValueError("must provide rigid cable s11/s12/s22 if Lh=-2") mdltype = mdl.Fourier if params.nfit2 > 16 else mdl.Polynomial mdlopts = { "transform": ( mdl.ZerotooneTransform( range=(s11freq.min().to_value("MHz"), s11freq.max().to_value("MHz")) ) if params.nfit2 > 16 else mdl.Log10Transform(scale=1) ), "n_terms": params.nfit2, } if params.nfit2 > 16: mdlopts["period"] = 1.5 hlc_model_params = S11ModelParams( model=mdltype(**mdlopts), set_transform_range=False, complex_model_type=mdl.ComplexRealImagModel, fit_method="lstsq", combine_s12s21=False, ) hot_load_cable_s11 = SParams( s11=s11rig, s12=s12rig, s22=s22rig, freqs=s11freq, ).smoothed(params=hlc_model_params, freqs=spec_fq) # Here we do something a little bogus. Alan fits the smoothing model # to the product s12*s21, which is what s12rig represents. So that's # why we passed s12rig as s12 above (instead of passing sqrt(s12rig)). # Now, we need to take the sqrt of s12 so have the right values. hot_load_cable_s11 = attrs.evolve( hot_load_cable_s11, s12=np.sqrt(hot_load_cable_s11.s12), s21=np.sqrt(hot_load_cable_s11.s12), ) hot_loss_model = LossFunctionGivenSparams(sparams=hot_load_cable_s11) elif isinstance(params.Lh, Path): hot_loss_model = get_loss_model_from_file(params.Lh) else: hot_loss_model = None return hot_loss_model
[docs] def edges( spcold: GSData, sphot: GSData, spopen: GSData, spshort: GSData, s11freq: np.ndarray, s11hot: np.ndarray, s11cold: np.ndarray, s11lna: np.ndarray, s11open: np.ndarray, s11short: np.ndarray, tload: float, tcal: float, params: EdgesScriptParams = EdgesScriptParams(), s11rig: np.ndarray | None = None, s12rig: np.ndarray | None = None, s22rig: np.ndarray | None = None, **kwargs, ) -> tuple[ CalibrationObservation, Calibrator, S11ModelParams, S11ModelParams, Callable | None ]: """A function that does what the edges3.c and edges2k.c C-code do. The primary purpose of this function is to model the input S11's, and then determine the noise-wave parameters. Parameters ---------- spcold, sphot, spopen, spshort The time-averaged spectra for the ambient (cold), hot_load, open, and short loads respectively. s11freq The frequencies at which the S11 data is sampled. s11hot, s11cold, s11lna, s11open, s11short The S11 measurements for the hot, ambient, LNA, open, and short loads respectively. tload A guess of the internal load temperature, used as the initial guess for the optimization. **MUST MATCH** tload used to generate the time-averaged spectra. tcal Like tload, but for the internal load + noise source. params An object defining the parameters used in determining the calibration. s11rig, s12rig, s22rig The S11, S12, and S22 measurements for the semi-rigid cable respectively. Optional -- generally required for EDGES-2. Returns ------- calobs The CalibrationObservation that holds all the data relevant for performing the receiver calibration. calibrator The final calibration solutions. s11_model_params The parameters used to create models of the load S11s. receiver_model_params The parameters used to create models of the receiver S11. hot_loss_model The model used to account for losses in the hot load. """ if kwargs: params = EdgesScriptParams(**kwargs) # First set up the S11 models specs = _get_specs(spcold, sphot, spopen, spshort, params, tload, tcal) spec_fq = specs["ambient"].freqs s11mask = get_mask( s11freq, low=params.wfstart * un.MHz, high=params.wfstop * un.MHz ) s11freq = s11freq[s11mask] raw_load_s11s, s11_model_params, load_s11s = _get_load_s11s( params, s11cold, s11hot, s11open, s11short, s11mask, s11freq, spec_fq ) raw_receiver, receiver_model_params, receiver = _get_receiver_s11( params, s11lna, s11mask, s11freq, spec_fq=spec_fq ) hot_loss_model = _get_hotload_loss( params, s11rig, s12rig, s22rig, s11mask, s11freq, spec_fq ) loads = { name: InputSource( spectrum=specs[name], reflection_coefficient=load_s11s[name], raw_s11=raw_load_s11s[name], loss=hot_loss_model(load_s11s[name]) if name == "hot_load" and hot_loss_model else np.ones(spec_fq.size), ambient_temperature=params.tcold * un.K, ) for name in specs } calobs = CalibrationObservation( loads=loads, receiver=receiver, raw_receiver=raw_receiver ) calibrator = get_noise_wave_calibration_iterative( calobs, cterms=params.cfit, wterms=params.wfit, apply_loss_to_true_temp=False, smooth_scale_offset_within_loop=False, ncal_iter=params.nter, cable_delay_sweep=np.arange(0, -1e-8, -1e-9), # hard-coded in the C code. fit_method="lstsq", # "alan-qrd", scale_offset_poly_spacing=0.5, t_load_guess=tload, t_load_ns_guess=tcal, ) return calobs, calibrator, s11_model_params, receiver_model_params, hot_loss_model
def _average_spectra( specfiles: dict[str, list[Path]], out: Path, redo_spectra: bool, fstart, fstop, telescope: str, **kwargs, ) -> GSData: spectra = {} for load, files in specfiles.items(): outfile = out / f"sp{load}.txt" if redo_spectra or not outfile.exists(): if len(files) == 0: raise ValueError(f"{load} has no spectrum files!") logger.info(f"Averaging {load} spectra") spectra[load] = acqplot7amoon( acqfile=files, fstart=fstart, fstop=fstop, **kwargs ) alanio.write_spec_txt_gsd(spectra[load], outfile) # Always read the spectra back in, because that's what Alan's C-code does. # This has the small effect of reducing the precision of the spectra. logger.info(f"Reading averaged {load} spectra") spectra[load] = alanio.read_spec_txt( outfile, time=spectra[load].times[0, 0] if load in spectra else Time.now(), telescope=telescope, name=load, ) return spectra
[docs] @attrs.define(kw_only=True, frozen=True) class Edges3CalobsParams: """ Parameters defining the calibration observation data for EDGES 3. Parameters ---------- specyear The year of the spectrum data. specday The day of the spectrum data. s11date The date of the S11 measurement in the format YYYY_DDD_HH. datadir The root directory of the observation data. match_resistance The measured impedance of the "match" calkit standard. Used to calibrate the Receiver s11. calkit_delays The delays of the three calkit standards. To set each individually, use the ``load_delay``, ``open_delay``, and ``short_delay`` parameters. lna_cable_length load_delay The delay of the "load" calkit stsandard. By default the same as ``calkit_delays``. open_delay The delay of the "open" calkit standard. By default the same as ``calkit_delays``. short_delay The delay of the "short" calkit standard. By default the same as ``calkit_delays``. lna_cable_length The length of the cable joining the receiver to the VNA in inches. lna_cable_loss The loss of the cable joining the receiver to the VNA in percent. lna_cable_dielectric The dielectric constant of the cable joining the receiver to the VNA in percent. """ specyear: int specday: int s11date: str datadir: Path = Path("/data5/edges/data/EDGES3_data/MRO/") match_resistance: Annotated[float, Parameter(name=("res",))] = 49.8 calkit_delays: Annotated[float, Parameter(name=("ps",))] = 33 load_delay: float = attrs.field() open_delay: float = attrs.field() short_delay: float = attrs.field() lna_cable_length: Annotated[float, Parameter(name=("cablen",))] = 4.26 lna_cable_loss: Annotated[float, Parameter(name=("cabloss",))] = -91.5 lna_cable_dielectric: Annotated[float, Parameter(name=("cabdiel",))] = -1.24 @load_delay.default def _load_delay_default(self): return self.calkit_delays @open_delay.default def _open_delay_default(self): return self.calkit_delays @short_delay.default def _short_delay_default(self): return self.calkit_delays
[docs] def get_caldef(self) -> CalObsDefEDGES3: """Get a calibration file definition.""" return CalObsDefEDGES3.from_standard_layout( rootdir=self.datadir, year=self.specyear, day=self.specday, s11_year=int(self.s11date.split("_")[0]) if self.s11date else None, s11_day=int(self.s11date.split("_")[1]) if self.s11date else None, s11_hour=int(self.s11date.split("_")[2]) if self.s11date else None, )
[docs] def get_raw_s11s(self): """Read all the raw S11 information.""" caldef = self.get_caldef() raws11s = {} for load in [*list(caldef.loads.keys()), "receiver_s11"]: if load in caldef.loads: s11def = caldef.loads[load].s11 else: s11def = caldef.receiver_s11 raws11s[load] = reads1p1( s11_file_open=s11def.calkit.open, s11_file_short=s11def.calkit.short, s11_file_load=s11def.calkit.match, s11_file_ant=s11def.external, res=self.match_resistance, loadps=self.load_delay, openps=self.open_delay, shortps=self.short_delay, ) if load == "receiver_s11": # Correction for path length raws11s[load] = corrcsv( raws11s[load], self.lna_cable_length, self.lna_cable_dielectric, self.lna_cable_loss, ) # Update the precision of the raws11s because we need to match the # C-code which writes to file and reads it back in. s11freq = raws11s[load].freqs np.round(s11freq, decimals=16, out=s11freq) raws11s[load] = ( np.round(raws11s[load].reflection_coefficient.real, decimals=16) + np.round(raws11s[load].reflection_coefficient.imag, decimals=16) * 1j ) return s11freq, raws11s
[docs] def get_spectrum_files(self) -> dict[str, list[Path]]: """Return a dictionary of all associated spectrum files.""" caldef = self.get_caldef() return {name: load.spectra for name, load in caldef.loads.items()}
[docs] @attrs.define(kw_only=True, frozen=True) class Edges2CalobsParams: """Parameters defining the calibration observation data for EDGES 2. Parameters ---------- s11_path The path to the S11 measurement file. Currently, this file must be a single, precalibrated file in Raul's legacy output format (CSV). ambient_acqs A list of paths to the ambient acq spectra files. hotload_acqs A list of paths to the hot load acq spectra files. open_acqs A list of paths to the open acq spectra files. short_acqs A list of paths to the short acq spectra files. """ s11_path: Path ambient_acqs: list[Path] hotload_acqs: list[Path] open_acqs: list[Path] short_acqs: list[Path]
[docs] def get_raw_s11s(self): """Read all the raw S11 information.""" s11s = alanio.read_raul_s11_format(self.s11_path) s11freq = s11s.pop("freq") << un.MHz s11s = {alanio.LOADMAP.get(load, load): val for load, val in s11s.items()} return s11freq, s11s
[docs] def get_spectrum_files(self): """Return a dictionary of all associated spectrum files.""" return { "ambient": self.ambient_acqs, "hot_load": self.hotload_acqs, "open": self.open_acqs, "short": self.short_acqs, }
[docs] def alancal( defparams: Edges3CalobsParams | Edges2CalobsParams, out: Path = Path(), redo_spectra: bool = False, redo_cal: bool = True, acqparams: ACQPlot7aMoonParams = ACQPlot7aMoonParams(), calparams: EdgesScriptParams = EdgesScriptParams(), ) -> tuple[ CalibrationObservation, Calibrator, S11ModelParams, S11ModelParams, Callable | None ]: """Run a calibration in as close a manner to Alan's code as possible. This exists mostly for being able to compare to Alan's memos etc in an easy way. It is much less flexible than using the library directly, and is not recommended for general use. This is supposed to emulate one of Alan's C-shell scripts, usually called "docal", and thus it runs a complete calibration, not just a single part. However, you can turn off parts of the calibration by setting the appropriate flags to False. Parameters ---------- defparams Parameters that define where to find files and how to read/calibrate the raw S11s. This is different between EDGES3 and EDGES2. out A directory where outputs can be written. redo_spectra Whether to re-average the spectra if they already exist in the output directory. redo_cal Whether to re-compute the calibration coefficients if they already exist. acqparams Parameters governing how to average the spectrum files. calparams Parameters governing how to model S11s and perform the calibration. Returns ------- calobs The CalibrationObservation that holds all the data relevant for performing the receiver calibration. calibrator The final calibration solutions. s11_model_params The parameters used to create models of the load S11s. receiver_model_params The parameters used to create models of the receiver S11. hot_loss_model The model used to account for losses in the hot load. """ out = Path(out) s11freq, raws11s = defparams.get_raw_s11s() specfiles = defparams.get_spectrum_files() if "receiver_s11" in raws11s: lna = raws11s.pop("receiver_s11") else: lna = raws11s.pop("lna") # Now average the spectra spectra = _average_spectra( specfiles=specfiles, out=out, redo_spectra=redo_spectra, fstart=acqparams.fstart, fstop=acqparams.fstop, smooth=acqparams.smooth, tload=acqparams.tload, tcal=acqparams.tcal, tstart=acqparams.tstart, tstop=acqparams.tstop, delaystart=acqparams.delaystart, telescope="edges3" if isinstance(defparams, Edges3CalobsParams) else "edges-low", ) # Now do the calibration outfile = out / "specal.txt" if not redo_cal and outfile.exists(): return None logger.info("Performing calibration") return edges( spcold=spectra["ambient"], sphot=spectra["hot_load"], spopen=spectra["open"], spshort=spectra["short"], s11freq=s11freq, s11cold=raws11s["ambient"], s11hot=raws11s["hot_load"], s11open=raws11s["open"], s11short=raws11s["short"], s11lna=lna, s11rig=raws11s.get("s11rig"), s12rig=raws11s.get("s12rig"), s22rig=raws11s.get("s22rig"), params=calparams, tcal=acqparams.tcal * un.K, tload=acqparams.tload * un.K, )