Source code for edges.modeling.fitting

"""Fitting routines for models."""

from copy import copy
from ctypes import CDLL, POINTER, c_double, c_int, c_longdouble
from functools import cached_property
from pathlib import Path
from typing import Literal

import attrs
import numpy as np
import scipy as sp

from ..io.serialization import hickleable
from . import core


[docs] @hickleable @attrs.define(frozen=True, slots=False) class ModelFit: """A class representing a fit of model to data. Parameters ---------- model The evaluatable model to fit to the data. ydata The values of the measured data. weights The weight of the measured data at each point. This corresponds to the *variance* of the measurement (not the standard deviation). This is appropriate if the weights represent the number of measurements going into each piece of data. method The method to solve the linear least squares problem. This can be either 'lstsq', 'qr' or 'alan-qrd'. The 'leastsq' method uses the np.linalg.lstsq function, while the 'qr' method uses the np.linalg.solve function after scipy.linalg.qr. The 'alan-qr' method is a python-port of the QR decomposition algorithm found in Alan's C Codebase. Raises ------ ValueError If model_type is not str, or a subclass of :class:`Model`. """ model: core.FixedLinearModel = attrs.field() ydata: np.ndarray = attrs.field() weights: np.ndarray | float = attrs.field( default=1.0, validator=attrs.validators.instance_of((np.ndarray, float)) ) method: Literal["lstsq", "qr", "alan-qrd"] = attrs.field( default="lstsq", validator=attrs.validators.in_(["lstsq", "qr", "alan-qrd", "qrd-c"]), ) @ydata.validator def _ydata_vld(self, att, val): assert val.shape == self.model.x.shape @weights.validator def _weights_vld(self, att, val): if isinstance(val, np.ndarray): assert val.shape == self.model.x.shape
[docs] @cached_property def degrees_of_freedom(self) -> int: """The number of degrees of freedom of the fit.""" return self.model.x.size - self.model.model.n_terms - 1
[docs] @cached_property def fit(self) -> core.FixedLinearModel: """A model that has parameters set based on the best fit to this data.""" mask = np.isfinite(self.ydata) w = self.weights if np.isscalar(self.weights) else self.weights[mask] if self.method == "lstsq": if np.isscalar(self.weights): pars = self._ls(self.model.basis[:, mask], self.ydata[mask]) else: pars = self._wls(self.model.basis[:, mask], self.ydata[mask], w=w) elif self.method == "qr": pars = self._qr(self.model.basis[:, mask], self.ydata[mask], w=w) elif self.method == "qrd-c": pars = self._qrd_c(self.model.basis[:, mask], self.ydata[mask], w=w) elif self.method == "alan-qrd": pars = self._alan_qrd(self.model.basis[:, mask], self.ydata[mask], w=w) # Create a new model with the same parameters but specific parameters and xdata. return self.model.with_params(parameters=pars)
def _qr(self, basis: np.ndarray, y: np.ndarray, w: np.ndarray) -> np.ndarray: """Solve a linear system using QR decomposition. Here the system is defined as A*theta = y, where A is an (n, m) matrix of basis vectors, y is an (n,) vector of data, and theta is an (m,) vector of parameters. See: http://www2.imm.dtu.dk/pubdb/views/edoc_download.php/2804/pdf/imm2804.pdf """ if np.isscalar(w): w = np.eye(len(y)) elif np.ndim(w) == 1: w = np.diag(w) # sqrt of weight matrix sqrtw = np.sqrt(w) # A and ydata "tilde" sqrt_wa = np.dot(sqrtw, basis.T) w_ydata = np.dot(sqrtw, y) # solving system using 'short' QR decomposition (see R. Butt, Num. Anal. # Using MATLAB) q, r = sp.linalg.qr(sqrt_wa, mode="economic") return sp.linalg.solve(r, np.dot(q.T, w_ydata)) def _alan_qrd(self, basis: np.ndarray, y: np.ndarray, w: np.ndarray) -> np.ndarray: """Solve a linear system using QR decomposition implemented in C. This solves the system in the same way as Alan Roger's original C-code that was used for Bowman+2018. See https://github.com/edges-collab/alans-pipeline/blob/ 0e41156ddc7aaa3dd4b37cd1ee1ada971e68d728/src/edges2k.c#L2735 for the C-Code. This is the same as qrd-c *except* that the preparation of the A and b matrices is done in python rather than C. This actually makes a difference because both A and b are dot-products, and the sum in numpy uses pairwise summation which is more accurate than a naive accumulation as would be done in C. """ if np.isscalar(w): w = np.eye(len(y)) elif np.ndim(w) == 1: w = np.diag(w) npar, _ = basis.shape wa = np.dot(basis, w) bbrr = np.dot(wa, y) aarr = np.dot(wa, basis.T) assert bbrr.shape == (npar,) assert aarr.shape == (npar, npar) _, bb = _c_qrd(aarr, bbrr) return bb def _qrd_c(self, basis: np.ndarray, y: np.ndarray, w: np.ndarray) -> np.ndarray: """Solve a linear system using QR decomposition implemented in C. This solves the system in the same way as Alan Roger's original C-code that was used for Bowman+2018. See https://github.com/edges-collab/alans-pipeline/blob/ 0e41156ddc7aaa3dd4b37cd1ee1ada971e68d728/src/edges2k.c#L2735 for the C-Code. """ if np.isscalar(w): w = np.ones(len(y)) elif np.ndim(w) != 1: raise ValueError("Weights must be a 1D array or scalar for qrd_c method.") # solve the system a, b = _get_a_and_b(basis, y, w) _, bb = _c_qrd(a, b) return bb def _wls(self, van, y, w): """Ripped straight outta numpy for speed. Note: this function is written purely for speed, and is intended to *not* be highly generic. Don't replace this by statsmodels or even np.polyfit. They are significantly slower (>4x for statsmodels, 1.5x for polyfit). """ # set up the least squares matrices and apply weights. # Don't use inplace operations as they # can cause problems with NA. mask = w > 0 lhs = van[:, mask] * w[mask] rhs = y[mask] * w[mask] rcond = y.size * np.finfo(y.dtype).eps # Determine the norms of the design matrix columns. scl = np.sqrt(np.square(lhs).sum(1)) scl[scl == 0] = 1 # Solve the least squares problem. c, _resids, _rank, _s = np.linalg.lstsq((lhs.T / scl), rhs.T, rcond) return (c.T / scl).T def _ls(self, van, y): """Ripped straight outta numpy for speed. Note: this function is written purely for speed, and is intended to *not* be highly generic. Don't replace this by statsmodels or even np.polyfit. They are significantly slower (>4x for statsmodels, 1.5x for polyfit). """ rcond = y.size * np.finfo(y.dtype).eps # Determine the norms of the design matrix columns. scl = np.sqrt(np.square(van.T).sum(axis=0)) # Solve the least squares problem. return np.linalg.lstsq((van.T / scl), y.T, rcond)[0] / scl
[docs] @cached_property def model_parameters(self): """The best-fit model parameters.""" # Parameters need to be copied into this object, otherwise a new fit on the # parent model will change the model_parameters of this fit! return copy(self.fit.model.parameters)
[docs] def evaluate(self, x: np.ndarray | None = None) -> np.ndarray: """Evaluate the best-fit model. Parameters ---------- x : np.ndarray, optional The co-ordinates at which to evaluate the model. By default, use the input data co-ordinates. Returns ------- y : np.ndarray The best-fit model evaluated at ``x``. """ return self.fit(x=x)
[docs] @cached_property def residual(self) -> np.ndarray: """Residuals of data to model.""" return self.ydata - self.evaluate()
[docs] @cached_property def weighted_chi2(self) -> float: """The chi^2 of the weighted fit.""" return np.dot(self.residual.T, self.weights * self.residual)
[docs] @cached_property def reduced_weighted_chi2(self) -> float: """The weighted chi^2 divided by the degrees of freedom.""" return (1 / self.degrees_of_freedom) * self.weighted_chi2
[docs] @cached_property def weighted_rms(self) -> float: """The weighted root-mean-square of the residuals.""" return np.sqrt(self.weighted_chi2) / np.sum(self.weights)
[docs] @cached_property def hessian(self): """The Hessian matrix of the linear parameters.""" b = self.model.basis w = self.weights return (b * w).dot(b.T)
[docs] @cached_property def parameter_covariance(self) -> np.ndarray: """The Covariance matrix of the parameters.""" return np.linalg.inv(self.hessian)
[docs] @cached_property def parameter_correlation(self) -> np.ndarray: """The correlation matrix of the parameters.""" cov = self.parameter_covariance std = np.sqrt(np.diag(cov)) return cov / std[:, None] / std[None, :]
[docs] def get_sample(self, size: int | tuple[int] = 1): """Generate a random sample from the posterior distribution.""" rng = np.random.default_rng() return rng.multivariate_normal( mean=self.model_parameters, cov=self.parameter_covariance, size=size )
def _c_qrd(a: np.ndarray, b: np.ndarray): """Solve a linear system using QR decomposition. This solves the system in the same way as Alan Roger's original C-code that was used for Bowman+2018. """ dllpath = next(iter(Path(__file__).parent.glob("cqrd.*.so"))) dll = CDLL(dllpath) qrd = dll.qrd qrd.argtypes = [POINTER(c_longdouble), c_int, POINTER(c_double)] qrd.restype = None aa = a.copy().astype(np.longdouble) bb = b.copy().astype(np.float64) qrd( aa.ctypes.data_as(POINTER(c_longdouble)), b.shape[0], bb.ctypes.data_as(POINTER(c_double)), ) return aa, bb def _get_a_and_b( basis: np.ndarray, data: np.ndarray, weights: np.ndarray, ): dllpath = next(iter(Path(__file__).parent.glob("cqrd.*.so"))) dll = CDLL(dllpath) lfit = dll.get_a_and_b lfit.argtypes = [ c_int, c_int, POINTER(c_double), POINTER(c_double), POINTER(c_double), POINTER(c_longdouble), POINTER(c_double), ] lfit.restype = None a = np.zeros(basis.shape[0] * basis.shape[0], dtype=np.longdouble) b = np.zeros(basis.shape[0], dtype=np.float64) basis = basis.astype(np.float64) data = data.astype(np.float64) weights = weights.astype(np.float64) lfit( basis.shape[0], basis.shape[1], basis.ctypes.data_as(POINTER(c_double)), data.ctypes.data_as(POINTER(c_double)), weights.ctypes.data_as(POINTER(c_double)), a.ctypes.data_as(POINTER(c_longdouble)), b.ctypes.data_as(POINTER(c_double)), ) return a, b