"""Functions for averaging arrays.
There are multiple methods in this module, due to the need for careful averages/binning
done in different ways. There are ultimately three axes over which we might bin spectra:
nights, LST/GHA and frequency. Each of these in fact requires slightly different methods
for averaging, in order to make the average unbiased (given flags).
"""
import contextlib
import numpy as np
from astropy import units as un
from ..tools import slice_along_axis
[docs]
def get_binned_weights(
x: np.ndarray,
bins: np.ndarray,
weights: np.ndarray | None = None,
include_left: bool = True,
include_right: bool = True,
) -> np.ndarray:
"""
Get the total weight in each bin for a given vector.
Parameters
----------
x
The input co-ordinates (1D).
bins
The bin edges into which to bin the x.
weights
Array with last dimension the same length as x. Input weights. Default is
all ones.
include_left
Whether to include coordinates to the left of the minimum bin in the first bin.
include_right
Whether to include coordinates to the right of the maximum bin in the last bin.
Note that for historical reasons, this is True, but it should probably be set
to False for typical cases.
Returns
-------
weights
Output bin weights, with shape equal to the input weights, but with the last
dimension replaced by ``len(bins) - 1``.
"""
bins = np.asarray(bins)
x = np.asarray(x)
if bins.size < 2:
raise ValueError("Bin edges must have at least 2 elements.")
if weights is None:
weights = np.ones_like(x)
if not np.all(np.diff(bins) > 0):
raise ValueError("Bin edges must be monotonically increasing!")
if weights.shape[-1] != len(x):
raise ValueError("Weights must have the same last axis shape as x.")
if include_right:
# In this case, NaNs and Infs will get put in the right-most bin,
# which is not what we want.
mask = np.isfinite(x)
x = x[mask]
weights = weights[..., mask]
out = np.zeros((*weights.shape[:-1], len(bins) - 1))
indices = np.digitize(x, bins) - 1
if include_left:
indices[indices < 0] = 0
if include_right:
indices[indices >= (len(bins) - 1)] = len(bins) - 2
for indx in np.ndindex(*out.shape[:-1]):
out[indx] = np.bincount(indices, weights=weights[indx], minlength=out.shape[-1])
return out
[docs]
def get_bin_edges(
coords: np.ndarray,
bins: np.ndarray | un.Quantity | float | None = None,
start: float | un.Quantity | None = None,
stop: float | un.Quantity | None = None,
) -> np.ndarray | un.Quantity:
"""Get bin edges given input coordinates and a simple description of the binning.
Parameters
----------
coords
The input co-ordinates to bin. These must be regular and monotonically
increasing.
bins
The bin *edges* (lower inclusive, upper not inclusive). If an ``int``, simply
use ``bins`` coords per bin, starting from the first bin. If a float, use
equi-spaced bin edges, starting from the start of coords, and ending past the
end of coords. If an array, assumed to be the bin edges.
If not provided, assume a single bin encompassing all the data.
start
Where to start the bin edges when ``bins`` is an int or float. Defaults to
the first coordinate minus half of the median coordinate difference.
stop
Where to stop the bin edges when ``bins`` is an int or float. Defaults to
the last coordinate plus half of the median coordinate difference.
Returns
-------
np.ndarray
The bin edges.
Notes
-----
This function is robust to the input coordinates being astropy Quantities, and will
return a quantity if the coordinates are.
"""
unit = getattr(coords, "unit", 1)
coords = getattr(coords, "value", coords)
if coords.size < 2:
raise ValueError("coords must have at least 2 elements.")
diffs = np.diff(coords)
dx = np.median(diffs)
if not np.all(diffs > 0):
raise ValueError("coords must be monotonically increasing!")
if not np.allclose(diffs, dx, rtol=1e-3):
raise ValueError("coords must be regularly spaced!")
start = coords[0] - dx / 2 if start is None else getattr(start, "value", start)
stop = coords[-1] + dx / 2 if stop is None else getattr(stop, "value", stop)
if bins is None:
return np.array([start, stop]) * unit
if not isinstance(bins, un.Quantity) and hasattr(bins, "__len__"):
return np.array(bins)
if isinstance(bins, un.Quantity) and not bins.isscalar:
return bins
if isinstance(bins, int):
if len(coords) % bins != 0:
return (coords[::bins] - dx / 2) * unit
edges = coords[::bins] - dx / 2
return np.concatenate((edges, [coords[-1] + dx / 2])) * unit
if unit != 1:
with contextlib.suppress(AttributeError):
bins = bins.to_value(unit)
return np.arange(start, stop + bins, bins) * unit
[docs]
def weighted_sum(
data: np.ndarray,
weights: np.ndarray | None = None,
normalize: bool = False,
axis: int = -1,
fill_value: float = np.nan,
keepdims: bool = False,
) -> tuple[np.ndarray, np.ndarray]:
"""Perform a careful weighted sum.
This routine is 'careful' in that it allows performing sums when the total weight
is zero, setting those values to a user-defined filling value.
Parameters
----------
data : array-like
The data over which the weighted mean is to be taken.
weights : array-like, optional
Same shape as data, giving the weights of each datum. By default,
all weights are unity.
normalize : bool, optional
If True, normalize weights so that the maximum weight is unity.
axis : int, optional
The axis over which to take the mean.
fill_value : float, optional
The value to fill in where the sum of the weights is zero.
keepdims : bool, optional
Whether to keep the original dimensions of ``data`` (i.e. have an axis
with length one for ``axis``).
Returns
-------
datasum
The weighted sum over `axis`, where elements with zero total weight are
set to ``fill_value``.
sumweights
The sum of the weights over `axis`.
"""
if weights is None:
weights = np.ones_like(data)
if data.shape != weights.shape:
raise ValueError("data and weights must have the same shape.")
if normalize:
weights = weights.copy() / np.nanmax(weights)
weights = np.where(np.isnan(data), 0, weights)
sm = np.nansum(data * weights, axis=axis, keepdims=keepdims, dtype=float)
weights = np.nansum(weights, axis=axis, keepdims=keepdims, dtype=float)
if hasattr(sm, "__len__"):
sm[weights == 0] = fill_value
elif weights == 0:
sm = fill_value
return sm, weights
[docs]
def weighted_mean(
data: np.ndarray,
weights: np.ndarray | None = None,
axis: int = -1,
fill_value: float = np.nan,
keepdims: bool = False,
) -> tuple[np.ndarray, np.ndarray]:
"""Perform a careful weighted mean where zero-weights don't error.
In this function, if the total weight is zero, fill_value is returned.
Parameters
----------
data : array-like
The data over which the weighted mean is to be taken.
weights : array-like, optional
Same shape as data, giving the weights of each datum.
axis : int, optional
The axis over which to take the mean.
fill_value : float, optional
The value to fill in where the sum of the weights is zero.
keepdims : bool, optional
Whether to keep the original dimensions of ``data`` (i.e. have an axis
with length one for ``axis``).
Returns
-------
avg
The weighted mean over `axis`, where elements with zero total weight are
set to ``fill_value``.
weights
The sum of the weights over `axis`.
"""
sm, weights = weighted_sum(
data, weights, axis=axis, keepdims=keepdims, fill_value=fill_value
)
mask = weights > 0
if not hasattr(sm, "__len__"):
return (sm / weights, weights) if mask else (fill_value, weights)
av = np.zeros_like(sm)
av[mask] = sm[mask] / weights[mask]
av[~mask] = fill_value
return av, weights
[docs]
def weighted_variance(
data: np.ndarray,
nsamples: np.ndarray | None = None,
avg: np.ndarray | None = None,
**kwargs,
):
"""Calculate a careful weighted variance.
Simply calculates the weighted mean of [(data - mean)/sigma]^2 over the data, where
the weights are 1/sigma^2. This is useful for computing the expected standard
deviation when the intrinsic variance and number of samples of each datum are known.
Parameters
----------
data : array-like
The data over which to calculate the variance.
nsamples
The number of samples corresponding to each datum. These will be used as
weights. Default is all unity.
avg
The weighted average of the data over the given axis. By default, compute this
internally.
Returns
-------
std
The weighted variance of the data over the given axis.
sumweights
The sum of the nsamples**2 over the given axis.
"""
if avg is None:
avg, _ = weighted_mean(data, weights=nsamples, keepdims=True, **kwargs)
return weighted_mean((data - avg) ** 2, nsamples**2, **kwargs)[0]
[docs]
def bin_data(
data: np.ndarray,
residuals: np.ndarray | None = None,
weights: np.ndarray | None = None,
bins: list[np.ndarray | slice] | None = None,
axis: int = -1,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Bin data, in an un-biased way if possible.
This uses the estimator from memo #183:
http://loco.lab.asu.edu/wp-content/uploads/2020/10/averaging_with_weights.pdf.
Parameters
----------
data
The data to be binned.
residuals
The residuals of the data, if known. If not provided, and weights is non
uniform, the average will be biased.
weights
The weights of the data. If not provided, assume all weights are unity.
bins
The bins into which to bin the data. If not provided, assume a single bin
encompassing all the data. Each element should be either an array that
indexes into the axis over which to bin, or a slice object.
axis
The axis over which to bin.
Returns
-------
data
The binned data.
weights
The weights of the binned data.
residuals
The binned residuals (if provided).
"""
if residuals is not None:
model = data - residuals
if axis < 0:
axis += data.ndim
if bins is None:
bins = [slice(None)]
shape = list(data.shape)
shape[axis] = len(bins)
outd = np.zeros(tuple(shape), dtype=data.dtype)
outw = np.zeros(tuple(shape), dtype=float)
if residuals is not None:
outr = np.zeros(tuple(shape), dtype=residuals.dtype)
else:
outr = None
ell = (slice(None),) * axis
for ibin, bn in enumerate(bins):
w = slice_along_axis(weights, bn, axis=axis) if weights is not None else None
if residuals is not None:
m = slice_along_axis(model, bn, axis=axis)
r = slice_along_axis(residuals, bn, axis=axis)
m = weighted_mean(m, axis=axis)[0]
r, w = weighted_mean(r, weights=w, axis=axis)
d = r + m
else:
d = slice_along_axis(data, bn, axis=axis)
d, w = weighted_mean(d, weights=w, axis=axis)
outd[*ell, ibin] = d
outw[*ell, ibin] = w
if residuals is not None:
outr[*ell, ibin] = r
return outd, outw, outr
[docs]
def bin_array_unweighted(x: np.ndarray, size: int = 1) -> np.ndarray:
"""Simple unweighted mean-binning of an array.
Parameters
----------
x
The array to be binned. Only the last axis will be binned.
size
The size of the bins.
Notes
-----
The last axis of `x` is binned. It is assumed that the coordinates corresponding
to `x` are regularly spaced, so the final average just takes `size` values and
averages them together.
If the array is not divisible by `size`, the last values are left out.
Examples
--------
Simple 1D example::
>>> x = np.array([1, 1, 2, 2, 3, 3])
>>> bin_array(x, size=2)
[1, 2, 3]
The last remaining values are left out::
>>> x = np.array([1, 1, 2, 2, 3, 3, 4])
>>> bin_array(x, size=2)
[1, 2, 3]
"""
if size == 1:
return x
n = x.shape[-1]
nn = size * (n // size)
return np.nanmean(x[..., :nn].reshape((*x.shape[:-1], -1, size)), axis=-1)