Source code for fracridge.fracridge

"""

"""
from random import sample
import numpy as np
from numpy import interp
import warnings
import collections

from sklearn.base import BaseEstimator, MultiOutputMixin
from sklearn.utils.validation import (check_X_y, check_array, check_is_fitted,
                                      _check_sample_weight)
from sklearn.linear_model._base import _preprocess_data, _rescale_data
from sklearn.model_selection import GridSearchCV

# Module-wide constants
BIG_BIAS = 10e3
SMALL_BIAS = 10e-3
BIAS_STEP = 0.2


__all__ = ["fracridge", "vec_len", "FracRidgeRegressor",
           "FracRidgeRegressorCV"]


def _do_svd(X, y, jit=True):
    """
    Helper function to produce SVD outputs
    """
    if len(y.shape) == 1:
        y = y[:, np.newaxis]

    # Per default, we'll try to use the jit-compiled SVD, which should be
    # more performant:
    use_scipy = False
    if jit:
        try:
            from ._linalg import svd
        except ImportError:
            warnings.warn("The `jit` key-word argument is set to `True` "\
                          "but numba could not be imported, or just-in time "\
                          "compilation failed. Falling back to "\
                          "`scipy.linalg.svd`")
            use_scipy = True

    # If that doesn't work, or you asked not to, we'll use scipy SVD:
    if not jit or use_scipy:
        from functools import partial
        from scipy.linalg import svd  # noqa
        svd = partial(svd, full_matrices=False)

    if X.shape[0] > X.shape[1]:
        uu, ss, v_t = svd(X.T @ X)
        selt = np.sqrt(ss)
        if y.shape[-1] >= X.shape[0]:
            ynew = (np.diag(1./selt) @ v_t @ X.T) @ y
        else:
            ynew = np.diag(1./selt) @ v_t @ (X.T @ y)

    else:
        # This rotates the targets by the unitary matrix uu.T:
        uu, selt, v_t = svd(X)
        ynew = uu.T @ y

    ols_coef = (ynew.T / selt).T

    return selt, v_t, ols_coef


[docs]def fracridge(X, y, fracs=None, tol=1e-10, jit=True): """ Approximates alpha parameters to match desired fractions of OLS length. Parameters ---------- X : ndarray, shape (n, p) Design matrix for regression, with n number of observations and p number of model parameters. y : ndarray, shape (n, b) Data, with n number of observations and b number of targets. fracs : float or 1d array, optional The desired fractions of the parameter vector length, relative to OLS solution. If 1d array, the shape is (f,). This input is required to be sorted. Otherwise, raises ValueError. Default: np.arange(.1, 1.1, .1). jit : bool, optional Whether to speed up computations by using a just-in-time compiled version of core computations. This may not work well with very large datasets. Default: True Returns ------- coef : ndarray, shape (p, f, b) The full estimated parameters across units of measurement for every desired fraction. If fracs is a float or y is a vector, the second or third dimensions are squeezed, correspondingly. alphas : ndarray, shape (f, b) The alpha coefficients associated with each solution Examples -------- Generate random data: >>> np.random.seed(0) >>> y = np.random.randn(100) >>> X = np.random.randn(100, 10) Calculate coefficients with naive OLS: >>> coef = np.linalg.inv(X.T @ X) @ X.T @ y >>> print(np.linalg.norm(coef)) # doctest: +NUMBER 0.35 Call fracridge function: >>> coef2, alpha = fracridge(X, y, 0.3) >>> print(np.linalg.norm(coef2)) # doctest: +NUMBER 0.10 >>> print(np.linalg.norm(coef2) / np.linalg.norm(coef)) # doctest: +NUMBER 0.3 Calculate coefficients with naive RR: >>> alphaI = alpha * np.eye(X.shape[1]) >>> coef3 = np.linalg.inv(X.T @ X + alphaI) @ X.T @ y >>> print(np.linalg.norm(coef2 - coef3)) # doctest: +NUMBER 0.0 """ if fracs is None: fracs = np.arange(.1, 1.1, .1) fracs_is_vector = hasattr(fracs, "__len__") if fracs_is_vector: if np.any(np.diff(fracs) < 0): raise ValueError("The `frac` inputs to the `fracridge` function ", f"must be sorted. You provided: {fracs}") else: fracs = [fracs] fracs = np.array(fracs) nn, pp = X.shape single_target = len(y.shape) == 1 if single_target: y = y[:, np.newaxis] bb = y.shape[-1] ff = fracs.shape[0] # Calculate the rotation of the data selt, v_t, ols_coef = _do_svd(X, y, jit=jit) # Set solutions for small eigenvalues to 0 for all targets: isbad = selt < tol if np.any(isbad): warnings.warn("Some eigenvalues are being treated as 0") ols_coef[isbad, ...] = 0 # Limits on the grid of candidate alphas used for interpolation: val1 = BIG_BIAS * selt[0] ** 2 val2 = SMALL_BIAS * selt[-1] ** 2 # Generates the grid of candidate alphas used in interpolation: alphagrid = np.concatenate( [np.array([0]), 10 ** np.arange(np.floor(np.log10(val2)), np.ceil(np.log10(val1)), BIAS_STEP)]) # The scaling factor applied to coefficients in the rotated space is # lambda**2 / (lambda**2 + alpha), where lambda are the singular values seltsq = selt**2 sclg = seltsq / (seltsq + alphagrid[:, None]) sclg_sq = sclg**2 # Prellocate the solution: if nn >= pp: first_dim = pp else: first_dim = nn coef = np.empty((first_dim, ff, bb)) alphas = np.empty((ff, bb)) # The main loop is over targets: for ii in range(y.shape[-1]): # Applies the scaling factors per alpha newlen = np.sqrt(sclg_sq @ ols_coef[..., ii]**2).T # Normalize to the length of the unregularized solution, # because (alphagrid[0] == 0) newlen = (newlen / newlen[0]) # Perform interpolation in a log transformed space (so it behaves # nicely), avoiding log of 0. temp = interp(fracs, newlen[::-1], np.log(1 + alphagrid)[::-1]) # Undo the log transform from the previous step targetalphas = np.exp(temp) - 1 # Allocate the alphas for this target: alphas[:, ii] = targetalphas # Calculate the new scaling factor, based on the interpolated alphas: sc = seltsq / (seltsq + targetalphas[np.newaxis].T) # Use the scaling factor to calculate coefficients in the rotated # space: coef[..., ii] = (sc * ols_coef[..., ii]).T # After iterating over all targets, we unrotate using the unitary v # matrix and reshape to conform to desired output: coef = np.reshape(v_t.T @ coef.reshape((first_dim, ff * bb)), (pp, ff, bb)) if single_target: coef = coef.squeeze(2) if not fracs_is_vector: coef = coef.squeeze(1) return coef, alphas
[docs]class FracRidgeRegressor(BaseEstimator, MultiOutputMixin): """ Parameters ---------- fracs : float or sequence The desired fractions of the parameter vector length, relative to OLS solution. Default: np.arange(.1, 1.1, .1) fit_intercept : bool, optional Whether to fit an intercept term. Default: False. normalize : bool, optional Whether to normalize the columns of X. Default: False. copy_X : bool, optional Whether to make a copy of the X matrix before fitting. Default: True. tol : float, optional. Tolerance under which singular values of the X matrix are considered to be zero. Default: 1e-10. jit : bool, optional. Whether to use jit-accelerated implementation. Default: True. Attributes ---------- coef_ : ndarray, shape (p, f, b) The full estimated parameters across units of measurement for every desired fraction. Where p number of model parameters, f number of fractions and b number of targets. alpha_ : ndarray, shape (f, b) The alpha coefficients associated with each solution. Where f number of fractions and b number of targets. Examples -------- Generate random data: >>> np.random.seed(1) >>> y = np.random.randn(100) >>> X = np.random.randn(100, 10) Calculate coefficients with naive OLS: >>> coef = np.linalg.inv(X.T @ X) @ X.T @ y Initialize the estimator with a single fraction: >>> fr = FracRidgeRegressor(fracs=0.3) Fit estimator: >>> fr.fit(X, y) FracRidgeRegressor(fracs=0.3) Check results: >>> coef_ = fr.coef_ >>> alpha_ = fr.alpha_ >>> print(np.linalg.norm(coef_) / np.linalg.norm(coef)) # doctest: +NUMBER 0.29 """
[docs] def __init__(self, fracs=None, fit_intercept=False, normalize=False, copy_X=True, tol=1e-10, jit=True): self.fracs = fracs self.fit_intercept = fit_intercept self.normalize = normalize self.copy_X = copy_X self.tol = tol self.jit = jit
def _validate_input(self, X, y, sample_weight=None): """ Helper function to validate the inputs """ X, y = check_X_y(X, y, y_numeric=True, multi_output=True) if sample_weight is not None: sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype) X, y, X_offset, y_offset, X_scale = _preprocess_data( X, y, fit_intercept=self.fit_intercept, normalize=self.normalize, copy=self.copy_X, sample_weight=sample_weight, check_input=True) if sample_weight is not None: # Sample weight can be implemented via a simple rescaling. outs = _rescale_data(X, y, sample_weight) X, y = outs[0], outs[1] return X, y, X_offset, y_offset, X_scale def fit(self, X, y, sample_weight=None): X, y, X_offset, y_offset, X_scale = self._validate_input( X, y, sample_weight=sample_weight) coef, alpha = fracridge(X, y, fracs=self.fracs, tol=self.tol, jit=self.jit) self.alpha_ = alpha self.coef_ = coef self._set_intercept(X_offset, y_offset, X_scale) self.is_fitted_ = True self.n_features_in_ = X.shape[1] return self def predict(self, X): X = check_array(X, accept_sparse=True) check_is_fitted(self, 'is_fitted_') if len(self.coef_.shape) == 0: pred_coef = self.coef_[np.newaxis] else: pred_coef = self.coef_ pred = np.tensordot(X, pred_coef, axes=(1)) if self.fit_intercept: pred = pred + self.intercept_ return pred def _set_intercept(self, X_offset, y_offset, X_scale): """Set the intercept_ """ if self.fit_intercept: if len(self.coef_.shape) <= 1: self.coef_ = self.coef_ / X_scale elif len(self.coef_.shape) == 2: self.coef_ = self.coef_ / X_scale[:, np.newaxis] elif len(self.coef_.shape) == 3: self.coef_ = self.coef_ / X_scale[:, np.newaxis, np.newaxis] self.intercept_ = y_offset - np.tensordot(X_offset, self.coef_, axes=(0,0)) else: self.intercept_ = 0.
[docs] def score(self, X, y, sample_weight=None): """ Score the fracridge fit """ from sklearn.metrics import r2_score y_pred = self.predict(X) if len(y_pred.shape) > len(y.shape): y = y[..., np.newaxis] y = np.broadcast_to(y, y_pred.shape) return r2_score(y, y_pred, sample_weight=sample_weight)
def _more_tags(self): return {'multioutput': True}
[docs]class FracRidgeRegressorCV(FracRidgeRegressor): """ Uses :class:`sklearn.model_selection.GridSearchCV` to find the best value of `frac` given the data, using cross-validation. Parameters ---------- fit_intercept : bool, optional Whether to fit an intercept term. Default: False. normalize : bool, optional Whether to normalize the columns of X. Default: False. copy_X : bool, optional Whether to make a copy of the X matrix before fitting. Default: True. tol : float, optional. Tolerance under which singular values of the X matrix are considered to be zero. Default: 1e-10. jit : bool, optional. Whether to use jit-accelerated implementation. Default: True. cv : int, cross-validation generator or an iterable See https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html # noqa scoring : str, callable, list/tuple or dict, default=None See https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html # noqa Attributes ---------- best_frac_ : float The best fraction as determined by cross-validation. alpha_ : ndarray, shape (b) The alpha coefficients associated with this fraction for each target. coef_ : ndarray, shape (p, b) The coefficients corresponding to the best solution. Where p number of parameters and b number of targets. Examples -------- Generate random data: >>> np.random.seed(1) >>> y = np.random.randn(100) >>> X = np.random.randn(100, 10) Fit model with cross-validation: >>> frcv = FracRidgeRegressorCV() >>> frcv.fit(X, y, frac_grid=np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1. ])) FracRidgeRegressorCV() >>> print(frcv.best_frac_) 0.1 """
[docs] def __init__(self, fit_intercept=False, normalize=False, copy_X=True, tol=1e-10, jit=True, cv=None, scoring=None): super().__init__(self, fit_intercept=fit_intercept, normalize=normalize, copy_X=copy_X, tol=tol, jit=jit) self.cv = cv self.scoring = scoring
[docs] def fit(self, X, y, sample_weight=None, frac_grid=None): """ Parameters ---------- frac_grid : sequence or float, optional The values of frac to consider. Default: np.arange(.1, 1.1, .1) """ X, y, _, _, _ = self._validate_input( X, y, sample_weight=None) if frac_grid is None: frac_grid=np.arange(.1, 1.1, .1) parameters = {'fracs': frac_grid} gs = GridSearchCV( FracRidgeRegressor( fit_intercept=self.fit_intercept, normalize=self.normalize, copy_X=self.copy_X, tol=self.tol, jit=self.jit), parameters, cv=self.cv, scoring=self.scoring) gs.fit(X, y, sample_weight=sample_weight) estimator = gs.best_estimator_ self.best_score_ = gs.best_score_ self.coef_ = estimator.coef_ self.intercept_ = estimator.intercept_ self.best_frac_ = estimator.fracs self.alpha_ = estimator.alpha_ self.is_fitted_ = True self.n_features_in_ = X.shape[1] return self
def _more_tags(self): return {'multioutput': True}
def vec_len(vec, axis=0): return np.sqrt((vec * vec).sum(axis=axis))