Source code for edges_analysis.averaging.averaging

"""Methods for averaging arrays.

There are multiple methods in this module, due to the need for careful averages/binning
done in different ways. There are ultimately three axes over which we might bin spectra:
nights, LST/GHA and frequency. Each of these in fact requires slightly different methods
for averaging, in order to make the average unbiased (given flags).
"""

from __future__ import annotations

import contextlib
import numpy as np
from astropy import units as un
from edges_cal import modelling as mdl

from ..tools import slice_along_axis


[docs] def get_binned_weights( x: np.ndarray, bins: np.ndarray, weights: np.ndarray | None = None ) -> np.ndarray: """ Get the total weight in each bin for a given vector. Parameters ---------- x The input co-ordinates (1D). bins The bins into which to bin the x. weights Array with last dimension the same length as x. Input weights. Returns ------- weights Output bin weights. """ out = np.zeros(weights.shape[:-1] + (len(bins) - 1,)) indices = np.digitize(x, bins) - 1 indices[indices >= (len(bins) - 1)] -= 1 for indx in np.ndindex(*out.shape[:-1]): out[indx] = np.bincount(indices, weights=weights[indx], minlength=out.shape[-1]) return out
[docs] def get_bin_edges( coords: np.ndarray, bins: np.ndarray | un.Quantity | int | float | None = None, start: float | un.Quantity | None = None, stop: float | un.Quantity | None = None, ) -> np.ndarray: """Get bin edges given input coordinates and a simple description of the binning. Parameters ---------- coords The input co-ordinates to bin. These must be regular and monotonically increasing. bins The bin *edges* (lower inclusive, upper not inclusive). If an ``int``, simply use ``bins`` samples per bin, starting from the first bin. If a float, use equi-spaced bin edges, starting from the start of coords, and ending past the end of coords. If not provided, assume a single bin encompassing all the data. """ unit = getattr(coords, "unit", 1) coords = getattr(coords, "value", coords) diffs = np.diff(coords) dx = np.median(diffs) if not np.all(diffs > 0): raise ValueError("coords must be monotonically increasing!") if not np.allclose(diffs, dx, rtol=1e-3): raise ValueError("coords must be regularly spaced!") if start is None: start = coords[0] - dx / 2 else: start = getattr(start, "value", start) if stop is None: stop = coords[-1] + dx / 2 else: stop = getattr(stop, "value", stop) if bins is None: return np.array([start, stop]) * unit elif not isinstance(bins, un.Quantity) and hasattr(bins, "__len__"): return np.array(bins) elif isinstance(bins, un.Quantity) and not bins.isscalar: return bins elif isinstance(bins, int): # works if its an integer if len(coords) % bins == 0: edges = coords[::bins] - dx / 2 return np.concatenate((edges, [coords[-1] + dx])) * unit else: return (coords[::bins] - dx / 2) * unit else: if unit != 1: with contextlib.suppress(AttributeError): bins = bins.to_value(unit) return np.arange(start, stop, bins) * unit
[docs] def bin_array_biased_regular( data: np.ndarray, weights: np.ndarray | None = None, coords: np.ndarray | None = None, axis: int = -1, bins: np.ndarray | int | float | None = None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Bin arbitrary-dimension data carefully along an axis. There are multiple ways to "bin" data along an axis when provided with weights. It is not typically accurate to return equi-spaced bins where data is averaged simply via summing with the weights (and the bin coords represent the centre of each bin). This results in some bias when the weights are not uniform. One way around this is to assume some underlying model and "fill in" the lower-weight bins. This would allow equi-spaced estimates. However, this function does something simpler -- it returns non-equi-spaced bins. This can be a little annoying if multiple data are to be binned, because one needs to keep track of the coordinates of each data separately. However, it is simple and accurate. Parameters ---------- data The data to be binned. May be of arbitrary dimension. weights The weights of the data. Must be the same shape as ``data``. If not provided, assume all weights are unity. coords The coordinates of the data along the axis to be averaged. If not provided, is taken to be the indices over the axis. axis The axis over which to bin. bins The bin *edges* (lower inclusive, upper not inclusive). If an ``int``, simply use ``bins`` samples per bin, starting from the first bin. If a float, use equi-spaced bin edges, starting from the start of coords, and ending past the end of coords. If not provided, assume a single bin encompassing all the data. Returns ------- coords The weighted average of the coordinates in each bin. If there is no weight in a bin """ axis %= data.ndim if weights is None: weights = np.ones(data.shape, dtype=float) if data.shape != weights.shape: raise ValueError("data and weights must have same shape") if coords is None: coords = np.arange(data.shape[axis]) if len(coords) != data.shape[axis]: raise ValueError("coords must be same length as the data along the given axis.") bins = get_bin_edges(coords, bins) # Get a list of tuples of bin edges bins = [(b, bins[i + 1]) for i, b in enumerate(bins[:-1])] # Generate the shape of the outputs by contracting one axis. out_shape = tuple(d if i != axis else len(bins) for i, d in enumerate(data.shape)) out_data = np.ones(out_shape) * np.nan out_wght = np.zeros(out_shape) for i, (lower, upper) in enumerate(bins): mask = np.where((coords >= lower) & (coords < upper))[0] if len(mask) > 0: this_data = data.take(mask, axis=axis) this_wght = weights.take(mask, axis=axis) this_slice = tuple( slice(None) if ax != axis else i for ax in range(data.ndim) ) out_data[this_slice], out_wght[this_slice] = weighted_mean( this_data, this_wght, axis=axis ) centres = [(b[0] + b[1]) / 2 for b in bins] if hasattr(centres[0], "unit"): centres = np.array([c.value for c in centres]) * centres[0].unit else: centres = np.array(centres) return centres, out_data, out_wght
[docs] def bin_array_unbiased_irregular( data: np.ndarray, weights: np.ndarray | None = None, coords: np.ndarray | None = None, axis: int = -1, bins: np.ndarray | int | float | None = None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Bin arbitrary-dimension data carefully along an axis. There are multiple ways to "bin" data along an axis when provided with weights. It is not typically accurate to return equi-spaced bins where data is averaged simply via summing with the weights (and the bin coords represent the centre of each bin). This results in some bias when the weights are not uniform. One way around this is to assume some underlying model and "fill in" the lower-weight bins. This would allow equi-spaced estimates. However, this function does something simpler -- it returns non-equi-spaced bins. This can be a little annoying if multiple data are to be binned, because one needs to keep track of the coordinates of each data separately. However, it is simple and accurate. Parameters ---------- data The data to be binned. May be of arbitrary dimension. weights The weights of the data. Must be the same shape as ``data``. If not provided, assume all weights are unity. coords The coordinates of the data along the axis to be averaged. If not provided, is taken to be the indices over the axis. axis The axis over which to bin. bins The bin *edges* (lower inclusive, upper not inclusive). If an ``int``, simply use ``bins`` samples per bin, starting from the first bin. If a float, use equi-spaced bin edges, starting from the start of coords, and ending past the end of coords. If not provided, assume a single bin encompassing all the data. Returns ------- coords The weighted average of the coordinates in each bin. If there is no weight in a bin """ axis %= data.ndim if weights is None: weights = np.ones(data.shape, dtype=float) if data.shape != weights.shape: raise ValueError("data and weights must have same shape") if coords is None: coords = np.arange(data.shape[axis]) if len(coords) != data.shape[axis]: raise ValueError("coords must be same length as the data along the given axis.") bins = get_bin_edges(coords, bins) # Get a list of tuples of bin edges bins = [(b, bins[i + 1]) for i, b in enumerate(bins[:-1])] # Generate the shape of the outputs by contracting one axis. out_shape = tuple(d if i != axis else len(bins) for i, d in enumerate(data.shape)) out_data = np.ones(out_shape) * np.nan out_wght = np.zeros(out_shape) out_coords = np.ones(out_shape) * np.nan init_shape = tuple( data.shape[-1] if i == axis else d for i, d in enumerate(data.shape[:-1]) ) for i, (lower, upper) in enumerate(bins): mask = np.where((coords >= lower) & (coords < upper))[0] if len(mask) > 0: this_data = data.take(mask, axis=axis) this_wght = weights.take(mask, axis=axis) this_crd = np.swapaxes( np.broadcast_to(coords[mask], init_shape + (len(coords[mask]),)), axis, -1, ) this_slice = tuple( slice(None) if ax != axis else i for ax in range(data.ndim) ) out_data[this_slice], out_wght[this_slice] = weighted_mean( this_data, this_wght, axis=axis ) out_coords[this_slice], _ = weighted_mean(this_crd, this_wght, axis=axis) return out_coords, out_data, out_wght
[docs] def bin_freq_unbiased_irregular( spectrum: list | np.ndarray, freq: list | np.ndarray | None = None, weights: list | np.ndarray | None = None, resolution: float | int | None = None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Average a spectrum, with weights, in frequency. The average is optionally taken within bins along the frequency axis. Under the hood, uses :func:`bin_array_unbiased_irregular`, just adding some nicer call parameters. Parameters ---------- spectrum : array-like The spectrum to average. Frequency axis is the last axis. freq : array-like, optional The frequencies along which to average. If provided, must be the same shape as ``spectrum``. Must be provided if either ``resolution`` or ``n_samples`` is provided. weights : array-like, optional The weights of the weighted averaged. If provided, same shape as ``spectrum``. If not provided, all weights are considered to be one. resolution : float, optional The (frequency) resolution with which to perform the average, in same units as ``freq``. For example, if an array of frequencies with resolution 0.1 MHz is passed in, and ``resolution`` is 0.2, the output array will contain half the number of bins. Default is to average the whole array. Returns ------- freq An array with length determined automatically by the routine, giving the mean frequency in each output bin. Note that the frequencies in each row may be different. spec Array of same length as ``freq`` containing the weighted-average spectrum w : array Array of same length as ``freq`` containing the total weight in each bin. Examples -------- >>> freq = np.linspace(0.1, 1, 10) >>> spectrum = [0, 2] * 5 >>> f, s, w = bin_freq_unbiased_irregular(spectrum, freq=freq, resolution=0.2) >>> f [0.15, 0.35, 0.55, 0.75, 0.95] >>> s [1, 1, 1, 1, 1] >>> w [1, 1, 1, 1, 1] """ if resolution and freq is None: raise ValueError("You must provide freq if resolution is provided!") if freq is not None and len(freq) != spectrum.shape[-1]: raise ValueError( f"provided freq ({len(freq)}) does not match final axis of spectrum " f"{spectrum.shape}!" ) out_freq, out_spec, out_wght = bin_array_unbiased_irregular( spectrum, weights=weights, coords=freq, bins=resolution ) return out_freq, out_spec, out_wght
[docs] def bin_freq_unbiased_regular( model: mdl.Model, params: np.ndarray, freq: np.ndarray, resids: np.ndarray, weights: np.ndarray, resolution: float | int | None = None, **fit_kwargs, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Bin an array along the frequency axis into *regular* bins. To make the bins regular, we use a model over the frequency axis to evaluate central value, and add the residuals to it. Parameters ---------- model_type The model for which the residuals are defined. params 2D array of parameters (parameters for each model along last axis). freq The frequencies of the input resids The residuals of the input model weights The weights of the data. resolution The resolution of the new data. fit_kwargs Anything passed to construct the :class:`ModelFit` instance, example ``method``. Returns ------- freq The new frequencies weights The binned weights spec The binned spectrum resids The new residuals to the same model params The new parameters of the same model """ if resolution is None or resolution: new_f_edges = get_bin_edges(freq, resolution) new_f = (new_f_edges[1:] + new_f_edges[:-1]) / 2 else: new_f = freq model = model.at(x=new_f) ev = np.array([model(parameters=p) for p in params]) if resolution != 0: _, r, w = bin_array_unbiased_irregular( resids, weights=weights, coords=freq, bins=resolution, ) else: r = resids w = weights s = ev + r new_r = [] new_p = [] for ss, ww in zip(s, w): m = model.fit(ss, ww, **fit_kwargs) new_r.append(m.residual) new_p.append(m.model_parameters) return new_f, w, s, np.array(new_r), np.array(new_p)
[docs] def bin_gha_unbiased_regular( params: np.ndarray, resids: np.ndarray, weights: np.ndarray, gha: np.ndarray, bins: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Bin data in an unbiased way using a model fit. See memo #183: http://loco.lab.asu.edu/wp-content/uploads/2020/10/averaging_with_weights.pdf) Data can be in the form of multiple observations, each of which has a "model" fit to it (eg. multiple GHA's with a model over frequency for each). Parameters ---------- params Model parameters for each data point. Shape should be (Nobs, Nterms). resids Residuals of the models to data. Shape should be (Nobs, Ncoords) weights Weights of the data/residuals. Shape should be same as resids. gha The GHA coordinates to bin. bins The bins into which the GHA should be fit Returns ------- params An array giving the averaged parameters of the model resids An array giving residuals in the averaged bins. weights The new weights after averaging. """ params_out = np.nan * np.ones((len(bins) - 1, params.shape[-1])) resids_out = np.nan * np.ones((len(bins) - 1, resids.shape[-1])) weights_out = np.zeros_like(resids_out) gha %= 24 assert gha.ndim == 1 for i, bin_low in enumerate(bins[:-1]): bin_high = bins[i + 1] bin_low %= 24 bin_high %= 24 if bin_low < bin_high: mask = (gha >= bin_low) & (gha < bin_high) else: mask = (gha < bin_high) | (gha >= bin_low) if np.sum(mask) == 0: # Skip this bin if nothing's in it continue these_params = params[mask] these_resids = resids[mask] these_weights = weights[mask] # Take the nanmean, because some entire integrations/GHA's might have been # flagged and therefore have no applicable model. Then the params should be NaN. # A nanmean of all NaNs returns NaN, so that makes sense. params_out[i] = np.nanmean(these_params, axis=0) resids_out[i], weights_out[i] = weighted_mean( these_resids, weights=these_weights, axis=0 ) return params_out, resids_out, weights_out
[docs] def bin_spectrum_unbiased_regular( params: np.ndarray, resids: np.ndarray, weights: np.ndarray, gha: np.ndarray, gha_bins: np.ndarray, model: mdl.Model, freq: np.ndarray, resolution: float | int | None = None, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Bin a spectrum in GHA and frequency in an unbiased and regular manner.""" p, r, w = bin_gha_unbiased_regular( params=params, resids=resids, weights=weights, gha=gha, bins=gha_bins ) new_f, w, s, new_r, new_p = bin_freq_unbiased_regular( model=model, params=p, freq=freq, resids=r, weights=w, resolution=resolution, ) return new_f, new_r, w, s, new_p
[docs] def weighted_sum(data, weights=None, normalize=False, axis=0): """A careful weighted sum. Parameters ---------- data : array-like The data over which the weighted mean is to be taken. weights : array-like, optional Same shape as data, giving the weights of each datum. normalize : bool, optional If True, normalize weights so that the maximum weight is unity. axis : int, optional The axis over which to take the mean. Returns ------- array-like : The weighted sum over `axis`, where elements with zero total weight are set to nan. """ if weights is None: weights = np.ones_like(data) if normalize: weights = weights.copy() / weights.nanmax() sm = np.nansum(data * weights, axis=axis) weights = np.nansum(weights, axis=axis) if hasattr(sm, "__len__"): sm[weights == 0] = np.nan elif weights == 0: sm = np.nan return sm, weights
[docs] def weighted_mean(data, weights=None, axis=0): """A careful weighted mean where zero-weights don't error. In this function, if the total weight is zero, np.nan is returned. Parameters ---------- data : array-like The data over which the weighted mean is to be taken. weights : array-like, optional Same shape as data, giving the weights of each datum. axis : int, optional The axis over which to take the mean. Returns ------- array-like : The weighted mean over `axis`, where elements with zero total weight are set to nan. """ sm, weights = weighted_sum(data, weights, axis=axis) mask = weights > 0 if isinstance(sm, float): return sm / weights if mask else np.nan, weights av = np.zeros_like(sm) av[mask] = sm[mask] / weights[mask] av[~mask] = np.nan return av, weights
[docs] def weighted_sorted_metric(data, weights=None, metric="median", **kwargs): """Semi-weighted integrator of data. This function will perform integrations of data that rely on sorting the data (eg. median or percentile). These are ony able to partial weighting -- i.e. weights of zero ensure that datum is ignored, while other weights all count the same. Parameters ---------- data : array-like The data over which the weighted mean is to be taken. weights : array-like, optional Same shape as data, giving the weights of each datum. metric : str, optional One of ('median', 'argmax', 'argmin','max', 'min', 'percentile', 'quantile'), specifying which metric to take. kwargs : Extra arguments to the function np.nan<metric>. Returns ------- array-like : The weighted mean over `axis`, where elements with zero total weight are set to nan. """ assert metric in ( "median", "argmax", "argmin", "max", "min", "percentile", "quantile", ) d = data.copy() d[weights == 0] = np.nan return getattr(np, "nan" + metric)(d, **kwargs)
[docs] def weighted_standard_deviation(av, data, std, axis=0): """Calcualte a careful weighted standard deviation.""" return np.sqrt(weighted_mean((data - av) ** 2, 1 / std**2, axis=axis)[0])
[docs] def bin_data( data: np.ndarray, residuals: np.ndarray | None = None, weights: np.ndarray | None = None, bins: list[np.ndarray | slice] | None = None, axis: int = -1, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Bin data, in an un-biased way if possible. This uses the estimator from memo #183: http://loco.lab.asu.edu/wp-content/uploads/2020/10/averaging_with_weights.pdf. Parameters ---------- data The data to be binned. residuals The residuals of the data, if known. If not provided, and weights is non uniform, the average will be biased. weights The weights of the data. If not provided, assume all weights are unity. bins The bins into which to bin the data. If not provided, assume a single bin encompassing all the data. Each element should be either an array that indexes into the axis over which to bin, or a slice object. axis The axis over which to bin. Returns ------- data The binned data. weights The weights of the binned data. residuals The binned residuals (if provided). """ if residuals is not None: model = data - residuals if axis < 0: axis += data.ndim shape = list(data.shape) shape[axis] = len(bins) outd = np.zeros(tuple(shape), dtype=data.dtype) outw = np.zeros(tuple(shape), dtype=float) if residuals is not None: outr = np.zeros(tuple(shape), dtype=residuals.dtype) else: outr = None ell = (slice(None),) * axis for ibin, bn in enumerate(bins): if weights is not None: w = slice_along_axis(weights, bn, axis=axis) else: w = None if residuals is not None: m = slice_along_axis(model, bn, axis=axis) r = slice_along_axis(residuals, bn, axis=axis) m = weighted_mean(m, axis=axis)[0] r, w = weighted_mean(r, weights=w, axis=axis) d = r + m else: d = slice_along_axis(data, bn, axis=axis) d, w = weighted_mean(d, weights=w, axis=axis) outd[ell + (ibin,)] = d outw[ell + (ibin,)] = w if residuals is not None: outr[ell + (ibin,)] = r return outd, outw, outr