"""Beam models and chromaticity corrections."""
import logging
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Literal, Self
import attrs
import numpy as np
import scipy.interpolate as spi
from astropy import units as u
from edges.data import BEAM_PATH
from .. import modeling as mdl
from .. import types as tp
from ..config import config
from ..units import vld_unit
logger = logging.getLogger(__name__)
[docs]
@attrs.define(kw_only=True)
class Beam:
"""A beam model object.
Attributes
----------
beam
The beam model, as a function of frequency, elevation, and azimuth.
frequency
The frequencies at which the beam is defined.
elevation
The elevation angles at which the beam is defined.
azimuth
The azimuth angles at which the beam is defined.
simulator
The simulator used to generate the beam.
instrument
The instrument for which the beam is defined.
raw_file
The path to the raw file from which the beam was read.
"""
beam: np.ndarray = attrs.field()
frequency: tp.FreqType = attrs.field(validator=vld_unit("frequency"))
elevation: np.ndarray = attrs.field(converter=np.asarray)
azimuth: np.ndarray = attrs.field(converter=np.asarray)
simulator: str | None = attrs.field(default=None, converter=str)
instrument: str | None = attrs.field(default=None, converter=str)
raw_file: str | None = attrs.field(default=None, converter=str)
[docs]
@classmethod
def from_hfss(
cls,
path: tp.PathLike,
frequency: tp.FreqType,
linear: bool = True,
theta_min: float = 0,
theta_max: float = 180,
theta_resolution: float = 1,
phi_min: float = 0,
phi_max: float = 359,
phi_resolution: float = 1,
) -> Self:
"""
Create a Beam object from a HFSS file.
Parameters
----------
path
Path to the file. Use :meth:`resolve_file` to get the absolute path
from a a relative path (starting with ':').
linear
Whether the beam values are in linear units (or decibels)
theta_min, theta_max
Min/Max of the zenith angle (degrees)
theta_resolution
Resolution of the zenith angle.
phi_min, phi_max
Min/Max of the azimuth angles (degrees)
phi_resolution
The resolution of the azimuth angle.
Returns
-------
beam
The beam object.
"""
d = np.genfromtxt(path, skip_header=1, delimiter=",")
theta = np.arange(theta_min, theta_max + theta_resolution, theta_resolution)
phi = np.arange(phi_min, phi_max + phi_resolution, phi_resolution)
beam_map = np.zeros((len(theta), len(phi)))
for i in range(len(theta)):
mask = d[:, 1] == theta[i]
this_phi = d[mask, 0]
this_power = d[mask, 2]
this_phi[this_phi < 0] += 360
this_phi, unique_inds = np.unique(this_phi, axis=0, return_index=True)
this_power = this_power[unique_inds]
this_power = this_power[np.argsort(this_phi)]
if not linear:
this_power = 10 ** (this_power / 10)
this_power[np.isnan(this_power)] = 0
beam_map[i, :] = this_power
return Beam(
frequency=np.array([frequency]),
elevation=theta,
azimuth=phi,
beam=beam_map,
simulator="hfss",
raw_file=path,
)
[docs]
@classmethod
def from_wipld(cls, path: tp.PathLike, az_antenna_axis: float = 0) -> Self:
"""Read a WIPL-D beam.
Parameters
----------
path
The path to the file.
az_antenna_axis
The azimuth of the primary antenna axis, in degrees.
"""
with Path(path).open("r") as fn:
file_length = 0
number_of_frequencies = 0
flag_columns = False
frequencies_list = []
for line in fn:
file_length += 1
if line[2] == ">":
number_of_frequencies += 1
frequencies_list.append(float(line[19:32]))
elif not flag_columns:
line_splitted = line.split()
number_of_columns = len(line_splitted)
flag_columns = True
rows_per_frequency = (
file_length - number_of_frequencies
) / number_of_frequencies
output = np.zeros((
int(number_of_frequencies),
int(rows_per_frequency),
int(number_of_columns),
))
frequencies = np.array(frequencies_list)
with Path(path).open("r") as fn:
i = -1
for line in fn:
if line[2] == ">":
i += 1
j = -1
else:
j += 1
line_splitted = line.split()
line_array = np.array(line_splitted, dtype=float)
output[i, j, :] = line_array
# Re-arrange data
phi_u = np.unique(output[0, :, 0])
theta_u = np.unique(output[0, :, 1])
beam = np.zeros((len(frequencies), len(theta_u), len(phi_u)))
for i in range(len(frequencies)):
out2d = output[i, :, :]
phi = out2d[:, 0]
theta = 90 - out2d[:, 1] # theta is zero at the zenith, and goes to 180 deg
gain = out2d[:, 6]
theta_u = np.unique(theta)
it = np.argsort(theta_u)
theta_a = theta_u[it]
for j in range(len(theta_a)):
phi_j = phi[theta == theta_a[j]]
gain_j = gain[theta == theta_a[j]]
ip = np.argsort(phi_j)
gp = gain_j[ip]
beam[i, j, :] = gp
# Flip beam from theta to elevation
beam_maps = beam[:, ::-1, :]
# Change coordinates from theta/phi, to AZ/EL
el = np.arange(0, 91)
az = np.arange(0, 360)
# Shifting beam relative to true AZ (referenced at due North)
beam_maps_shifted = cls.shift_beam_maps(az_antenna_axis, beam_maps)
return Beam(
frequency=frequencies,
azimuth=az,
elevation=el,
beam=beam_maps_shifted,
simulator="wipl-d",
raw_file=path,
)
[docs]
@classmethod
def from_feko(cls, path: str | Path, az_antenna_axis: float = 0) -> Self:
"""
Read a FEKO beam file.
Parameters
----------
filename
The path to the file.
az_antenna_axis
The azimuth of the primary antenna axis, in degrees.
"""
filename = Path(path)
data = np.loadtxt(str(filename), usecols=list(range(2, 93)))
frequency = []
with filename.open("r") as fl:
for line in fl:
if line.startswith("#FREQUENCY"):
line = line.split("MHz")[0].strip().split(" ")
frequency.append(float(line[-1]))
freq = np.array(frequency) * u.MHz
# Loading data and convert to linear representation
beam_maps = np.zeros((len(frequency), 91, 360))
for i in range(len(frequency)):
beam_maps[i] = (10 ** (data[(i * 360) : ((i + 1) * 360)] / 10)).T
# Shifting beam relative to true AZ (referenced at due North)
# Due to angle of orientation of excited antenna panels relative to due North
beam_maps = cls.shift_beam_maps(az_antenna_axis, beam_maps)
return Beam(
frequency=freq,
beam=beam_maps,
azimuth=np.arange(0, 360),
elevation=np.arange(0, 91),
simulator="feko",
raw_file=path,
)
[docs]
@classmethod
def from_cst(
cls,
file_name_prefix: tp.PathLike,
f_low: int = 40,
f_high: int = 100,
freq_p: int = 61,
theta_p: float = 181,
phi_p: float = 361,
az_antenna_axis: float = 0,
) -> Self:
"""
Read a CST beam file.
Parameters
----------
filename
The path to the file.
az_antenna_axis
The azimuth of the primary antenna axis, in degrees.
f_low, f_high
lower and higher frequency bounds
freq_p
Number of frequency points in the simulation file.
theta_p
Number of zenith angle points in the simulation file.
phi_p
Number of azimuth points in the simulation file.
Returns
-------
beam
The beam object.
"""
phi_t = np.zeros((freq_p, theta_p * phi_p))
frequency = np.linspace(f_low, f_high, freq_p)
beam_square = np.zeros((freq_p, theta_p, phi_p))
res = (f_high - f_low) / (freq_p - 1)
for i in range(freq_p):
n = int(i * res + f_low)
thispath = Path(f"{file_name_prefix}farfield (f={n}) [1].txt")
with thispath.open("rb") as fc_file:
for x, line in enumerate(fc_file):
if x > 1:
check = 0
for o in range(len(line)):
if line[o] != "":
check = check + 1
if check == 3:
phi_t[i][x - 2] = float(line.split()[2])
for i in range(freq_p):
for x in range(phi_p):
beam_square[i, :, x] = phi_t[i, x * theta_p : (x + 1) * theta_p]
beam_square[i, :, 0 : int(phi_p / 2)] = beam_square[
i, :, int(phi_p / 2) : phi_p - 1
]
freq = np.array(frequency) * u.MHz
beam_square[:, 91:, :] = 0
beam_maps = np.flip(beam_square[:, :91, :360], axis=1)
# Shifting beam relative to true AZ (referenced at due North)
# Due to angle of orientation of excited antenna panels relative to due North
beam_maps = cls.shift_beam_maps(az_antenna_axis, beam_maps)
return Beam(
frequency=freq,
beam=beam_maps,
azimuth=np.arange(0, 360),
elevation=np.arange(0, 91),
simulator="cst",
raw_file=file_name_prefix,
)
[docs]
@classmethod
def from_feko_raw(
cls,
file_name_prefix: tp.PathLike,
ext: str = "txt",
f_low: int = 40,
f_high: int = 100,
freq_p: int = 61,
theta_p: float = 181,
phi_p: float = 361,
az_antenna_axis: float = 0,
) -> Self:
"""
Read a FEKO beam file.
Parameters
----------
filename
The path to the file.
az_antenna_axis
The azimuth of the primary antenna axis, in degrees.
f_low, f_high
lower and higher frequency bounds
freq_p
Number of frequency points in the simulation file.
theta_p
Number of zenith angle points in the simulation file.
phi_p
Number of azimuth points in the simulation file.
Returns
-------
beam
The beam object.
"""
beam_square = np.zeros((freq_p, theta_p, phi_p))
frequency = np.linspace(f_low, f_high, freq_p)
# TODO: change this to no.of theta * no.of phi + No.of header lines
z = theta_p * 91 + 10
def read_file(path, idx: int):
with path.open("r") as fl:
for index, line in enumerate(fl):
if index % z == 0:
co = 0
if index % z >= 10:
x = list(map(float, line.split()))
here = int(index / z), co % theta_p, int(co / theta_p) + idx
beam_square[here] = 10 ** (x[8] / 10)
co += 1
read_file(Path(f"{file_name_prefix}_0-90.{ext}"), 0)
z = theta_p * 90 + 10 #
read_file(Path(f"{file_name_prefix}_91-180.{ext}"), 91)
read_file(Path(f"{file_name_prefix}_181-270.{ext}"), 181)
read_file(Path(f"{file_name_prefix}_271-360.{ext}"), 271)
freq = np.array(frequency) * u.MHz
beam_square[:, 90:, :] = 0
beam_maps = np.flip(beam_square[:, :91, :360], axis=1)
# Shifting beam relative to true AZ (referenced at due North)
# Due to angle of orientation of excited antenna panels relative to due North
beam_maps = cls.shift_beam_maps(az_antenna_axis, beam_maps)
return Beam(
frequency=freq,
beam=beam_maps,
azimuth=np.arange(0, 360),
elevation=np.arange(0, 91),
simulator="feko",
raw_file=file_name_prefix,
)
[docs]
@classmethod
def get_beam_path(cls, band: str, kind: str | None = None) -> Path:
"""Get a standard path to a beam file."""
pth = f"{kind}.txt" if kind else BEAM_PATH / band / "default.txt"
if not pth.exists():
raise FileNotFoundError(f"No beam exists for band={band}.")
return pth
[docs]
def select_freqs(self, indx: Sequence[int]) -> Self:
"""Select a subset of frequencies.
Parameters
----------
indx
The indices of the frequencies to select.
"""
if not hasattr(indx, "__len__"):
indx = [indx]
return attrs.evolve(
self,
frequency=self.frequency[indx],
beam=self.beam[indx],
)
[docs]
def at_freq(
self,
freq: tp.FreqType,
model: mdl.Model = mdl.Polynomial(
n_terms=13, transform=mdl.ScaleTransform(scale=75.0)
),
**fit_kwargs,
) -> Self:
"""
Interpolate the beam to a new set of frequencies.
Parameters
----------
freq
Frequencies to interpolate to.
model
The model to use for interpolation.
fit_kwargs
Keyword arguments to pass to the model fit.
Returns
-------
beam
The Beam object at the new frequencies.
"""
if len(self.frequency) < 3:
raise ValueError(
"Can't freq-interpolate beam that has fewer than three frequencies."
)
# Frequency interpolation
interp_beam = np.zeros((len(freq), len(self.elevation), len(self.azimuth)))
cached_model = model.at(x=self.frequency.to_value("MHz"))
for i, bm in enumerate(self.beam.T):
for j, b in enumerate(bm):
model_fit = cached_model.fit(ydata=b, **fit_kwargs)
interp_beam[:, j, i] = model_fit.evaluate(freq.to_value("MHz"))
return Beam(
frequency=freq,
azimuth=self.azimuth,
elevation=self.elevation,
beam=interp_beam,
)
[docs]
def smoothed(
self, model: mdl.Model = mdl.Polynomial(n_terms=12), **fit_kwargs
) -> Self:
"""
Return a new beam, smoothed over the frequency axis, but without decimation.
Parameters
----------
model
The model to use for smoothing.
fit_kwargs
Keyword arguments to pass to the model fit.
Returns
-------
beam
The Beam object smoothed over its same frequencies.
"""
if len(self.frequency) < 3:
raise ValueError(
"Can't freq-interpolate beam that has fewer than three frequencies."
)
# Frequency smoothing
smooth_beam = np.zeros_like(self.beam)
cached_model = model.at(x=self.frequency.to_value("MHz"))
for i, bm in enumerate(self.beam.T):
for j, b in enumerate(bm):
model_fit = cached_model.fit(ydata=b, **fit_kwargs)
smooth_beam[:, j, i] = model_fit.evaluate()
return Beam(
frequency=self.frequency,
azimuth=self.azimuth,
elevation=self.elevation,
beam=smooth_beam,
)
[docs]
@staticmethod
def shift_beam_maps(az_antenna_axis: float, beam_maps: np.ndarray) -> np.ndarray:
"""Rotate beam maps around an axis.
Parameters
----------
az_antenna_axis
The aximuth angle of the antenna axis.
beam_maps
Beam maps as a function of frequency, za and az.
Returns
-------
beam maps
Array of the same shape as the input, but rotated.
"""
if az_antenna_axis < 0:
index = -az_antenna_axis
bm1 = beam_maps[:, :, index::]
bm2 = beam_maps[:, :, 0:index]
return np.append(bm1, bm2, axis=2)
if az_antenna_axis > 0:
index = az_antenna_axis
bm1 = beam_maps[:, :, 0:(-index)]
bm2 = beam_maps[:, :, (360 - index) :]
return np.append(bm2, bm1, axis=2)
return beam_maps
[docs]
@classmethod
def resolve_file(
cls,
path: tp.PathLike,
band: str | None = None,
configuration: str = "default",
simulator: str = "feko",
) -> Path:
"""Resolve a file path to a standard location."""
if str(path) == ":":
if band is None:
raise ValueError("band must be given if path starts with a colon (:)")
# Get the default beam file.
return BEAM_PATH / band / f"{configuration}.txt"
if str(path).startswith(":"):
if band is None:
raise ValueError("band must be given if path starts with a colon (:)")
# Use a beam file in the standard directory.
return (
config.beams / f"{band}/simulations/{simulator}/{str(path)[1:]}"
).absolute()
return Path(path).absolute()
[docs]
@classmethod
def from_file(
cls,
band: str | None,
simulator: str = "feko",
beam_file: tp.PathLike = ":",
configuration: str = "default",
rotation_from_north: float = 90,
) -> Self:
"""Read a beam from file."""
beam_file = cls.resolve_file(beam_file, band, configuration, simulator)
if simulator == "feko":
out = cls.from_feko(beam_file, az_antenna_axis=rotation_from_north)
elif simulator == "wipl-d": # Beams from WIPL-D
out = cls.from_wipld(beam_file, rotation_from_north)
elif simulator == "hfss":
out = cls.from_hfss(beam_file)
else:
raise ValueError(f"Unknown value for simulator: '{simulator}'")
out.instrument = band
return out
[docs]
def angular_interpolator(
self,
freq_indx: int,
interp_kind: Literal[
"linear",
"nearest",
"slinear",
"quintic",
"pchip",
"spline",
"sphere-spline",
] = "linear",
) -> Callable[[np.ndarray, np.ndarray], np.ndarray]:
"""Return a callable function that interpolates the beam.
The returned function has the signature ``interp(az, el)``, where ``az`` is
azimuth in degrees, and ``el`` is elevation in degrees. They may be arrays,
in which case they should be the same length.
"""
beam = self.beam[freq_indx]
if interp_kind == "sphere-spline":
el = self.elevation * np.pi / 180 + np.pi / 2
if el[-1] > 0.999 * np.pi:
el = el[:-1]
beam = beam[:-1]
spl = spi.RectSphereBivariateSpline(
el,
self.azimuth * np.pi / 180,
beam,
pole_values=(None, beam[-1]),
pole_exact=True,
)
return lambda az, el: spl(
el * np.pi / 180 + np.pi / 2, az * np.pi / 180, grid=False
)
az = np.concatenate([self.azimuth, [self.azimuth[0] + 360]])
beam = np.hstack((beam, beam[:, 0][:, None]))
if interp_kind == "spline":
spl = spi.RectBivariateSpline(az, self.elevation, beam.T, kx=3, ky=3, s=0)
return lambda az, el: spl(az, el, grid=False)
spl = spi.RegularGridInterpolator(
(az, self.elevation),
beam.T,
method=interp_kind,
)
return lambda az, el: spl(np.array([az, el]).T)
[docs]
def between_freqs(
self, low: tp.FreqType = 0 * u.MHz, high: tp.FreqType = np.inf * u.MHz
) -> Self:
"""Return a new :class:`Beam` object restricted to a given frequency range."""
mask = (self.frequency >= low) & (self.frequency <= high)
return attrs.evolve(self, frequency=self.frequency[mask], beam=self.beam[mask])
[docs]
def get_beam_solid_angle(self) -> float:
"""Calculate the integrated beam solid angle."""
sin_theta = np.cos(self.elevation * (np.pi / 180))
sin_theta = np.tile(sin_theta, (len(self.azimuth), 1)).T
beam_integration = np.sum(self.beam * sin_theta, axis=(1, 2))
d_el = self.elevation[1] - self.elevation[0]
d_az = self.azimuth[1] - self.azimuth[0]
return d_el * d_az * (np.pi / 180) ** 2 * beam_integration
[docs]
def compute_ground_loss(self):
"""Compute the ground loss for the beam."""
raise NotImplementedError("We need to get this right...")