"""Functions for writing/reading to/from file formats used in the C pipeline."""
from pathlib import Path
import numpy as np
from astropy import units as un
from astropy.table import QTable
from astropy.time import Time
from bidict import bidict
from pygsdata import GSData, GSFlag, History, Stamp
from edges.cal.loss import LossFunctionGivenSparams
from .. import types as tp
from ..cal import CalibrationObservation, Calibrator
from ..const import KNOWN_TELESCOPES
# A bi-directional mapping between load-names as used in the C code
# vs load names in the python code.
LOADMAP = bidict({
"amb": "ambient",
"hot": "hot_load",
"open": "open",
"short": "short",
})
SPEC_LOADMAP = bidict({
"load": "ambient",
"hot": "hot_load",
"open": "open",
"short": "short",
})
[docs]
def read_s11_csv(fname) -> tuple[np.ndarray, np.ndarray]:
"""Read a CSV file containing S11 data in Alan's output format."""
with open(fname) as fl:
data = np.genfromtxt(fl, delimiter=",", skip_header=1, skip_footer=1)
freq = data[:, 0]
s11 = data[:, 1] + data[:, 2] * 1j
return freq, s11
[docs]
def write_s11_csv(freq, s11, fname):
"""Write a standard S11 CSV file."""
# write out the CSV file
with fname.open("w") as fl:
fl.write("BEGIN\n")
for freq_, s11_ in zip(freq, s11, strict=False):
fl.write(
f"{freq_.to_value('MHz'):1.16e},{s11_.real:1.16e},{s11_.imag:1.16e}\n"
)
fl.write("END")
[docs]
def read_spec_txt(
fname: tp.PathLike,
time: Time | None = None,
telescope: str = "edges-low",
name: str = "",
) -> GSData:
"""Read an averaged-spectrum file, like the ones output by acqplot7amoon."""
out = np.genfromtxt(
fname,
names=["freq", "spectra", "weight"],
comments="/",
usecols=[0, 1, 2],
)
with open(fname) as fl:
n = int(fl.readline()[31:].split(" ")[0])
if time is None:
time = Time.now()
return GSData(
telescope=KNOWN_TELESCOPES.get(telescope),
data=out["spectra"][None, None, None, :],
freqs=out["freq"] * un.MHz,
times=time + np.zeros((1, 1)) * un.second,
effective_integration_time=13.0 * un.second,
nsamples=out["weight"][None, None, None, :] * n,
flags={"flags": GSFlag(~(out["weight"].astype(bool)), axes=("freq",))},
history=History([Stamp(f"read from {fname}")]),
data_unit="uncalibrated_temp",
name=name,
)
[docs]
def write_spec_txt(
freq: np.ndarray | tp.FreqType, n: float, spec: np.ndarray, fname: tp.PathLike
):
"""Write an averaged-spectrum file, like spe_<load>r.txt files from edges2k.c."""
if hasattr(freq, "unit"):
freq = freq.to_value("MHz")
with open(fname, "w") as fl:
for i, (f, sp) in enumerate(zip(freq, spec, strict=False)):
if i == 0:
fl.write(f"{f:12.6f} {sp:12.6f} {1:4.0f} {n:d} // temp.acq\n")
else:
fl.write(f"{f:12.6f} {sp:12.6f} {1:4.0f}\n")
[docs]
def write_spec_txt_gsd(gsd: GSData, fname: tp.PathLike):
"""Write a standard spe.txt file given a GSData object."""
write_spec_txt(
freq=gsd.freqs,
n=int(np.mean(gsd.nsamples)),
spec=gsd.data[0, 0, 0],
fname=fname,
)
[docs]
def read_specal(fname: tp.PathLike, t_load: float, t_load_ns: float) -> Calibrator:
"""Read a specal file, like the ones output by edges3(k)."""
data = np.genfromtxt(
fname,
names=[
"freq",
"s11lna_real",
"s11lna_imag",
"C1",
"C2",
"Tunc",
"Tcos",
"Tsin",
"weight",
],
usecols=(1, 3, 4, 6, 8, 10, 12, 14, 16),
)
mask = data["weight"] > 0
data = data[mask]
return Calibrator(
freqs=data["freq"] * un.MHz,
Tsca=t_load_ns * data["C1"],
Toff=t_load - data["C2"],
Tunc=data["Tunc"],
Tcos=data["Tcos"],
Tsin=data["Tsin"],
receiver_s11=data["s11lna_real"] + 1j * data["s11lna_imag"],
)
[docs]
def read_specal_iter(fname: tp.PathLike) -> np.ndarray:
"""
Read a specal file, like the ones output by edges3(k).
The outputs are from an intermediate iteration of NW modeling step.
"""
return np.genfromtxt(
fname,
names=[
"iter",
"freq",
"C1",
"C2",
"Tunc",
"Tcos",
"Tsin",
],
usecols=(1, 3, 5, 7, 9, 11, 13),
)
[docs]
def read_alan_calibrated_temp(fname: tp.PathLike) -> np.ndarray:
"""
Read calibrated_{load}.txt from edges3.c.
This gives uncalibrated and calibrated temperatures.
"""
return np.genfromtxt(
fname,
names=[
"freq",
"uncal",
"cal",
],
delimiter=" ",
skip_header=1,
)
[docs]
def write_specal(
calibrator: Calibrator,
outfile: tp.PathLike,
t_load: float,
t_load_ns: float,
precision="10.6f",
):
"""Write a specal file in the same format as those output by the C-code edges3.c."""
sca = calibrator.Tsca / t_load_ns
ofs = t_load - calibrator.Toff
tlnau = calibrator.Tunc
tlnac = calibrator.Tcos
tlnas = calibrator.Tsin
lna = calibrator.receiver_s11
with open(outfile, "w") as fl:
fl.writelines(
f"freq {calibrator.freqs[i].to_value('MHz'):{precision}} "
f"s11lna {lna[i].real:{precision}} {lna[i].imag:{precision}} "
f"sca {sca[i]:{precision}} ofs {ofs[i]:{precision}} "
f"tlnau {tlnau[i]:{precision}} tlnac {tlnac[i]:{precision}} "
f"tlnas {tlnas[i]:{precision}} wtcal 1 cal_data\n"
for i in range(calibrator.freqs.size)
)
[docs]
def write_modelled_s11s(
calobs: CalibrationObservation, fname: tp.PathLike, hot_loss_model=None
):
"""Write all modelled S11's in a calobs object to file, in the same format as C.
If a HotLoadCorrection exists, also write the rigid cable S-parameters, as
edges2k.c does, otherwise assume the edges3.c format.
"""
s11m = {
name: load.reflection_coefficient.s11 for name, load in calobs.loads.items()
}
lna = calobs.receiver.s11
if isinstance(hot_loss_model, LossFunctionGivenSparams):
s11m |= {
"rig_s11": hot_loss_model.sparams.s11,
"rig_s12": hot_loss_model.sparams.s12**2,
"rig_s22": hot_loss_model.sparams.s22,
}
with open(fname, "w") as fl:
if "rig_s11" in s11m:
fl.write(
"# freq, amb_real amb_imag hot_real hot_imag open_real open_imag "
"short_real short_imag lna_real lna_imag rig_s11_real rig_s11_imag "
"rig_s12_real rig_s12_imag rig_s22_real rig_s22_imag\n"
)
fl.writelines(
f"{f} {amb.real} {amb.imag} "
f"{hot.real} {hot.imag} "
f"{op.real} {op.imag} "
f"{sh.real} {sh.imag} "
f"{lna[i].real} {lna[i].imag} "
f"{rigs11.real} {rigs11.imag} "
f"{rigs12.real} {rigs12.imag} "
f"{rigs22.real} {rigs22.imag}\n"
for i, (f, amb, hot, op, sh, rigs11, rigs12, rigs22) in enumerate(
zip(
calobs.freqs.to_value("MHz"),
s11m["ambient"],
s11m["hot_load"],
s11m["open"],
s11m["short"],
s11m["rig_s11"],
s11m["rig_s12"],
s11m["rig_s22"],
strict=False,
)
)
)
else:
fl.write(
"# freq, amb_real amb_imag hot_real hot_imag open_real open_imag "
"short_real short_imag lna_real lna_imag\n"
)
fl.writelines(
f"{f} {amb.real} {amb.imag} "
f"{hot.real} {hot.imag} "
f"{op.real} {op.imag} "
f"{sh.real} {sh.imag} "
f"{lna[i].real} {lna[i].imag}\n"
for i, (f, amb, hot, op, sh) in enumerate(
zip(
calobs.freqs.to_value("MHz"),
s11m["ambient"],
s11m["hot_load"],
s11m["open"],
s11m["short"],
strict=False,
)
)
)
[docs]
def read_modelled_s11s(pth: Path) -> QTable:
"""Read modelled S11s from a csv file."""
raw_s11m = np.genfromtxt(pth, comments="#", names=True)
s11m = {}
freq = raw_s11m["freq"]
for name in raw_s11m.dtype.names:
if name == "freq":
s11m["freqs"] = freq * un.MHz
continue
bits = name.split("_")
cmp = bits[-1]
load = "_".join(bits[:-1])
if load == "lna":
load = "receiver"
elif load.startswith("rig"):
load = f"semi_rigid {load.split('_')[-1]}"
else:
load = LOADMAP[load]
if load not in s11m:
s11m[load] = np.zeros(len(raw_s11m), dtype=complex)
s11m[load] += raw_s11m[name] if cmp == "real" else raw_s11m[name] * 1j
return QTable(s11m)
[docs]
def read_spe_file(
filename: tp.PathLike,
time: Time = Time.now(),
telescope: str = "edges-low",
name: str = "",
):
"""Read Alan's spectrum files with formats like those of spe0.txt."""
out = np.genfromtxt(
filename,
usecols=(1, 3, 6, 9, 12),
names=("freq", "tant", "model", "resid", "weight"),
)
return GSData(
telescope=KNOWN_TELESCOPES.get(telescope),
data=out["tant"][None, None, None, :],
freqs=out["freq"] * un.MHz,
times=[[time]],
effective_integration_time=13.0 * un.second,
nsamples=out["weight"][None, None, None, :],
residuals=out["resid"][None, None, None],
flags={"flags": ~(out["weight"].astype(bool))},
history=History([Stamp(f"read from {filename}")]),
data_unit="temperature",
name=name,
)
[docs]
def read_all_spec_txt(direc: Path, flow, fhigh):
"""Read all spe_*r.txt files from a directory into a dict of spectra."""
spec = {}
allfiles = direc.glob("spe_*r.txt")
for fl in allfiles:
load = fl.name.split("_")[1][:-5]
s = read_spec_txt(fl)
freq = s.freqs
mask = (freq >= flow) * (freq <= fhigh)
spec[SPEC_LOADMAP[load]] = s.data.squeeze()[mask]
return freq[mask], spec