Source code for edges.averaging.lstbin

"""Functions for doing LST binning on GSData objects."""

import itertools
import logging
import warnings

import numpy as np
import pygsdata.coordinates as crd
from astropy import units as un
from astropy.coordinates import Longitude
from astropy.time import Time
from pygsdata import GSData, GSFlag, gsregister
from pygsdata.coordinates import lsts_to_times
from pygsdata.utils import angle_centre

from .averaging import bin_data
from .utils import NsamplesStrategy, get_weights_from_strategy

logger = logging.getLogger(__name__)


[docs] def get_lst_bins( binsize: float, first_edge: float = 0, max_edge: float = 24 ) -> np.ndarray: """Determine LST bins given a bin size and first edge, in hours. This function will return equi-spaced bins starting at `first_edge` and with width `binsize`. The last bin will be less than or equal to `max_edge`, after accounting for wrapping at 24 hours. Parameters ---------- binsize The size of the bins in hours. first_edge The first edge of the first bin. max_edge The maximum edge of the last bin. Returns ------- np.ndarray The LST bin edges. """ if binsize > 24: raise ValueError("Binsize must be <= 24 hours") if binsize <= 0: raise ValueError("Binsize must be greater than 0 hours") first_edge %= 24 while max_edge <= (first_edge + 1e-4): max_edge += 24 bins = np.arange(first_edge, max_edge, binsize) if np.isclose(bins.max() + binsize, max_edge): bins = np.append(bins, max_edge) return bins
[docs] @gsregister("reduce") def average_over_times( data: GSData, nsamples_strategy: NsamplesStrategy = NsamplesStrategy.FLAGGED_NSAMPLES, reference_lst: Longitude = Longitude(12 * un.hour), use_resids: bool | None = None, fill_value: float = 0.0, ) -> GSData: """Average a GSData object over the time axis. Parameters ---------- data The data over which to average. nsamples_strategy The strategy to use when defining the weights of each sample. See :class:`~edges.analysis.averaging.NsamplesStrategy` for more information. reference_lst An LST set as the central LST when finding the new mean LST. All LSTs will be wrapped within 12 hours of this reference before taking the mean. use_resids : bool, optional Whether to average the residuals and add them back to the mean model, or simply average the data directly. fill_value : float The value to impute when no data exists in a bin. """ if use_resids is None: use_resids = data.residuals is not None if use_resids and data.residuals is None: raise ValueError("Cannot average residuals without a model.") w, n = get_weights_from_strategy(data, nsamples_strategy) ntot = np.sum(w, axis=-2) nsamples_tot = np.sum(n, axis=-2) if use_resids: sum_resids = np.nansum(data.residuals * w, axis=-2) mean_resids = sum_resids / ntot mean_model = np.nanmean(data.model, axis=-2) new_data = mean_model + mean_resids else: sum_data = np.nansum(data.data * w, axis=-2) new_data = sum_data / ntot new_data[np.isnan(new_data)] = fill_value # The new time will be the mean unflagged time ww = np.any(w > 0, axis=(0, 1, 3)) times = Time(np.atleast_2d(np.mean(data.times.jd[ww], axis=0)), format="jd") time_ranges = Time( np.array([ [ data.time_ranges.jd[ww].min(axis=(0, 2)), data.time_ranges.jd[ww].max(axis=(0, 2)), ] ]).transpose((0, 2, 1)), format="jd", ) # Wrap the LSTs into +-12 hours of the reference LST. # Note that we de-unit the quantities to do the wrapping # because astropy does weird things when trying to wrap # a Longitude/Angle lsts = data.lsts.hour.copy() lsts[lsts <= reference_lst.hour - 12] += 24 lsts[lsts > reference_lst.hour + 12] -= 24 lst_ranges = data.lst_ranges.hour.copy() lst_ranges[lst_ranges <= reference_lst.hour - 12] += 24 lst_ranges[lst_ranges > reference_lst.hour + 12] -= 24 if data.auxiliary_measurements is not None: new_aux = { key: np.array([np.nanmean(data.auxiliary_measurements[key])]) for key in data.auxiliary_measurements.columns } else: new_aux = None return data.update( data=new_data[:, :, None, :], residuals=mean_resids[:, :, None, :] if use_resids else None, times=times, time_ranges=time_ranges, lsts=Longitude(np.atleast_2d(np.mean(lsts[ww], axis=0)) * un.hourangle), lst_ranges=Longitude( np.array([ [ lst_ranges[ww, :, 0].min(axis=0), lst_ranges[ww, :, 1].max(axis=0), ] ]).transpose((0, 2, 1)) * un.hourangle ), effective_integration_time=np.mean(data.effective_integration_time, axis=2)[ :, :, None ], nsamples=nsamples_tot[:, :, None], flags={}, auxiliary_measurements=new_aux, )
[docs] @gsregister("reduce") def lst_bin( data: GSData, binsize: float = 24.0, first_edge: float = 0.0, max_edge: float = 24.0, in_gha: bool = False, use_model_residuals: bool | None = None, reference_time: float | Time | str = "mean", ): """Average data within bins of LST. Parameters ---------- data The :class:`GSData` object to bin. binsize The size of the LST bins in hours. first_edge The first edge of the first bin in hours. max_edge The maximum edge of the last bin in hours, see :func:`get_lst_bins`. in_gha Whether to bin in GHA or LST (default). use_model_residuals Whether to use the model residuals to de-bias the mean in each bin. If True, either `model` must be provided, or `residuals` must be specified on the GSData object. reference_time The JD at which to reference the LSTs to in the output. The JDs of the output will be exactly at the centre of each LST bin, but the _day_ to which they are referenced will be set by the `reference_time` (all will be within 24 hours of this time). This can be a float (JD), an astropy Time, or one of 'min', 'max', 'mean' or 'closest' (default). Options 'min', 'max' and 'mean' will use the corresponding min/max/mean time in the data object to set the reference time. Returns ------- GSData A new :class:`GSData` object with the binned data. """ if use_model_residuals is None: use_model_residuals = data.residuals is not None if use_model_residuals and data.residuals is None: raise ValueError("Cannot bin with models without residuals!") if in_gha: first_edge = crd.gha2lst(first_edge * un.hourangle).to_value("hourangle") max_edge = crd.gha2lst(max_edge * un.hourangle).to_value("hourangle") bins = get_lst_bins(binsize, first_edge, max_edge=max_edge) logger.debug(f"Got LST bins: {bins}") lsts = data.lsts.hour.copy() lsts %= 24 lsts[lsts < bins[0]] += 24 spec = np.zeros((data.nloads, data.npols, len(bins) - 1, data.nfreqs)) resids = np.zeros_like(spec) if use_model_residuals else None nsmpls = np.zeros_like(spec) for iload in range(data.nloads): bbins = [ (b[0] <= lsts[:, iload]) & (lsts[:, iload] < b[1]) for b in itertools.pairwise(bins) ] spec[iload], nsmpls[iload], r = bin_data( data.data[iload], residuals=data.residuals[iload] if use_model_residuals else None, weights=data.flagged_nsamples[iload], bins=bbins, axis=-2, ) if use_model_residuals: resids[iload] = r lst_ranges = np.tile( np.array(list(itertools.pairwise(bins)))[:, None], (1, data.nloads, 1) ) lst_ranges = Longitude(lst_ranges * un.hour) lstbins = angle_centre(lst_ranges[..., 0], lst_ranges[..., 1]) # Flag anything that is all nan -- these are just empty LSTs. flg = GSFlag(flags=np.all(np.isnan(spec), axis=(0, 1, 3)), axes=("time",)) if data.auxiliary_measurements is not None: warnings.warn("Auxiliary measurements cannot be binned!", stacklevel=2) if data._effective_integration_time.size > 1: if np.unique(data._effective_integration_time).size > 1: warnings.warn( "lstbin does not yet support variable integration times!", stacklevel=2 ) intg_time = np.mean(data._effective_integration_time) else: intg_time = data._effective_integration_time # Determine a reference time. The output GSDatawill still need to have "times" in JD # which should be exactly at the centre of each LST bin. However, the choice of # which JD each LST bin should correspond to is somewhat arbitrary. if reference_time == "min": reference_time = data.times.min() elif reference_time == "max": reference_time = data.times.max() elif reference_time == "mean": reference_time = data.times.mean() times = lsts_to_times( np.where(lstbins < lstbins[0, 0], lstbins + 2 * np.pi * un.rad, lstbins), ref_time=reference_time, location=data.telescope.location, ) time_ranges = lsts_to_times( np.where( lst_ranges < lst_ranges[0, 0, 0], lst_ranges + 2 * np.pi * un.rad, lst_ranges, ), ref_time=reference_time, location=data.telescope.location, ) return data.update( data=spec, residuals=resids, nsamples=nsmpls, flags={"empty_lsts": flg}, lsts=lstbins, lst_ranges=lst_ranges, times=times, time_ranges=time_ranges, auxiliary_measurements=None, effective_integration_time=intg_time, )