Source code for linear_operator_learning.kernel.regressors

"""Kernel-based regressors for linear operators."""

from math import sqrt
from typing import Literal
from warnings import warn

import numpy as np
import scipy.linalg
from numpy import ndarray
from scipy.sparse.linalg import eigs, eigsh

from linear_operator_learning.kernel.linalg import (
    add_diagonal_,
    stable_topk,
    weighted_norm,
)
from linear_operator_learning.kernel.structs import EigResult, FitResult
from linear_operator_learning.kernel.utils import sanitize_complex_conjugates

__all__ = [
    "predict",
    "eig",
    "evaluate_eigenfunction",
    "pcr",
    "nystroem_pcr",
    "reduced_rank",
    "nystroem_reduced_rank",
    "rand_reduced_rank",
]


[docs] def eig( fit_result: FitResult, K_X: ndarray, # Kernel matrix of the input data K_YX: ndarray, # Kernel matrix between the output data and the input data ) -> EigResult: """Computes the eigendecomposition of a regressor. Args: fit_result (FitResult): Fit result as defined in ``linear_operator_learning.kernel.structs``. K_X (ndarray): Kernel matrix of the input data. K_YX (ndarray): Kernel matrix between the output data and the input data. Shape: ``K_X``: :math:`(N, N)`, where :math:`N` is the sample size. ``K_YX``: :math:`(N, N)`, where :math:`N` is the sample size. Output: ``U, V`` of shape :math:`(N, R)`, ``svals`` of shape :math:`R` where :math:`N` is the sample size and :math:`R` is the rank of the regressor. """ # SUV.TZ -> V.T K_YX U (right ev = SUvr, left ev = ZVvl) U = fit_result["U"] V = fit_result["V"] r_dim = (K_X.shape[0]) ** (-1) W_YX = np.linalg.multi_dot([V.T, r_dim * K_YX, U]) W_X = np.linalg.multi_dot([U.T, r_dim * K_X, U]) values, vl, vr = scipy.linalg.eig(W_YX, left=True, right=True) # Left -> V, Right -> U values = sanitize_complex_conjugates(values) r_perm = np.argsort(values) vr = vr[:, r_perm] l_perm = np.argsort(values.conj()) vl = vl[:, l_perm] values = values[r_perm] rcond = 1000.0 * np.finfo(U.dtype).eps # Normalization in RKHS norm_r = weighted_norm(vr, W_X) norm_r = np.where(norm_r < rcond, np.inf, norm_r) vr = vr / norm_r # Bi-orthogonality of left eigenfunctions norm_l = np.diag(np.linalg.multi_dot([vl.T, W_YX, vr])) norm_l = np.where(np.abs(norm_l) < rcond, np.inf, norm_l) vl = vl / norm_l result: EigResult = {"values": values, "left": V @ vl, "right": U @ vr} return result
[docs] def evaluate_eigenfunction( eig_result: EigResult, which: Literal["left", "right"], K_Xin_X_or_Y: ndarray, ): """Evaluates left or right eigenfunctions of a regressor. Args: eig_result: EigResult object containing eigendecomposition results which: String indicating "left" or "right" eigenfunctions K_Xin_X_or_Y: Kernel matrix between initial conditions and input data (for right eigenfunctions) or output data (for left eigenfunctions) Shape: ``eig_result``: ``U, V`` of shape :math:`(N, R)`, ``svals`` of shape :math:`R` where :math:`N` is the sample size and :math:`R` is the rank of the regressor. ``K_Xin_X_or_Y``: :math:`(N_0, N)`, where :math:`N_0` is the number of inputs to predict and :math:`N` is the sample size. Output: :math:`(N_0, R)` """ vr_or_vl = eig_result[which] rsqrt_dim = (K_Xin_X_or_Y.shape[1]) ** (-0.5) return np.linalg.multi_dot([rsqrt_dim * K_Xin_X_or_Y, vr_or_vl])
[docs] def predict( num_steps: int, fit_result: FitResult, kernel_YX: ndarray, kernel_Xin_X: ndarray, obs_train_Y: ndarray, ) -> ndarray: """Predicts future states given initial values using a fitted regressor. Args: num_steps (int): Number of steps to predict forward (returns the last prediction) fit_result (FitResult): FitResult object containing fitted U and V matrices kernel_YX (ndarray): Kernel matrix between output data and input data (or inducing points for Nystroem) kernel_Xin_X (ndarray): Kernel matrix between initial conditions and input data (or inducing points for Nystroem) obs_train_Y (ndarray): Observable evaluated on output training data (or inducing points for Nystroem) Shape: ``kernel_YX``: :math:`(N, N)`, where :math:`N` is the number of training data, or inducing points for Nystroem. ``kernel_Xin_X``: :math:`(N_0, N)`, where :math:`N_0` is the number of inputs to predict. ``obs_train_Y``: :math:`(N, *)`, where :math:`*` is the shape of the observable. Output: :math:`(N, *)`. """ # G = S UV.T Z # G^n = (SU)(V.T K_YX U)^(n-1)(V.T Z) U = fit_result["U"] V = fit_result["V"] npts = U.shape[0] K_dot_U = kernel_Xin_X @ U / sqrt(npts) V_dot_obs = V.T @ obs_train_Y / sqrt(npts) V_K_YX_U = np.linalg.multi_dot([V.T, kernel_YX, U]) / npts M = np.linalg.matrix_power(V_K_YX_U, num_steps - 1) return np.linalg.multi_dot([K_dot_U, M, V_dot_obs])
[docs] def pcr( kernel_X: ndarray, tikhonov_reg: float = 0.0, rank: int | None = None, svd_solver: Literal["arnoldi", "full"] = "arnoldi", ) -> FitResult: """Fits the Principal Components estimator. Args: kernel_X (ndarray): Kernel matrix of the input data. tikhonov_reg (float, optional): Tikhonov (ridge) regularization parameter. Defaults to 0.0. rank (int | None, optional): Rank of the estimator. Defaults to None. svd_solver (Literal[ &quot;arnoldi&quot;, &quot;full&quot; ], optional): Solver for the generalized eigenvalue problem. Defaults to "arnoldi". Shape: ``kernel_X``: :math:`(N, N)`, where :math:`N` is the number of training data. """ npts = kernel_X.shape[0] add_diagonal_(kernel_X, npts * tikhonov_reg) if svd_solver == "arnoldi": _num_arnoldi_eigs = min(rank + 5, kernel_X.shape[0]) values, vectors = eigsh(kernel_X, k=_num_arnoldi_eigs) elif svd_solver == "full": values, vectors = scipy.linalg.eigh(kernel_X) else: raise ValueError(f"Unknown svd_solver {svd_solver}") add_diagonal_(kernel_X, -npts * tikhonov_reg) values, stable_values_idxs = stable_topk(values, rank, ignore_warnings=False) vectors = vectors[:, stable_values_idxs] Q = sqrt(npts) * vectors / np.sqrt(values) kernel_X_eigvalsh = np.sqrt(np.abs(values)) / npts result: FitResult = {"U": Q, "V": Q, "svals": kernel_X_eigvalsh} return result
[docs] def nystroem_pcr( kernel_X: ndarray, # Kernel matrix of the input inducing points kernel_Y: ndarray, # Kernel matrix of the output inducing points kernel_Xnys: ndarray, # Kernel matrix between the input data and the input inducing points kernel_Ynys: ndarray, # Kernel matrix between the output data and the output inducing points tikhonov_reg: float = 0.0, # Tikhonov (ridge) regularization parameter (can be 0) rank: int | None = None, # Rank of the estimator svd_solver: Literal["arnoldi", "full"] = "arnoldi", ) -> FitResult: """Fits the Principal Components estimator using the Nyström method from :footcite:t:`Meanti2023`. Args: kernel_X (ndarray): Kernel matrix of the input inducing points. kernel_Y (ndarray): Kernel matrix of the output inducing points. kernel_Xnys (ndarray): Kernel matrix between the input data and the input inducing points. kernel_Ynys (ndarray): Kernel matrix between the output data and the output inducing points. tikhonov_reg (float, optional): Tikhonov (ridge) regularization parameter. Defaults to 0.0. rank (int | None, optional): Rank of the estimator. Defaults to None. svd_solver (Literal[ "arnoldi", "full" ], optional): Solver for the generalized eigenvalue problem. Defaults to "arnoldi". Shape: ``kernel_X``: :math:`(N, N)`, where :math:`N` is the number of training data. ``kernel_Y``: :math:`(N, N)`. ``kernel_Xnys``: :math:`(N, M)`, where :math:`M` is the number of Nystroem centers (inducing points). ``kernel_Ynys``: :math:`(N, M)`. """ ncenters = kernel_X.shape[0] npts = kernel_Xnys.shape[0] eps = 1000 * np.finfo(kernel_X.dtype).eps reg = max(eps, tikhonov_reg) kernel_Xnys_sq = kernel_Xnys.T @ kernel_Xnys add_diagonal_(kernel_X, reg * ncenters) if svd_solver == "full": values, vectors = scipy.linalg.eigh( kernel_Xnys_sq, kernel_X ) # normalization leads to needing to invert evals elif svd_solver == "arnoldi": _oversampling = max(10, 4 * int(np.sqrt(rank))) _num_arnoldi_eigs = min(rank + _oversampling, ncenters) values, vectors = eigsh( kernel_Xnys_sq, M=kernel_X, k=_num_arnoldi_eigs, which="LM", ) else: raise ValueError(f"Unknown svd_solver {svd_solver}") add_diagonal_(kernel_X, -reg * ncenters) values, stable_values_idxs = stable_topk(values, rank, ignore_warnings=False) vectors = vectors[:, stable_values_idxs] U = sqrt(ncenters) * vectors / np.sqrt(values) V = np.linalg.multi_dot([kernel_Ynys.T, kernel_Xnys, vectors]) V = scipy.linalg.lstsq(kernel_Y, V)[0] V = sqrt(ncenters) * V / np.sqrt(values) kernel_X_eigvalsh = np.sqrt(np.abs(values)) / npts result: FitResult = {"U": U, "V": V, "svals": kernel_X_eigvalsh} return result
[docs] def reduced_rank( kernel_X: ndarray, # Kernel matrix of the input data kernel_Y: ndarray, # Kernel matrix of the output data tikhonov_reg: float, # Tikhonov (ridge) regularization parameter, can be 0 rank: int, # Rank of the estimator svd_solver: Literal["arnoldi", "full"] = "arnoldi", ) -> FitResult: """Fits the Reduced Rank estimator from :footcite:t:`Kostic2022`. Args: kernel_X (ndarray): Kernel matrix of the input data. kernel_Y (ndarray): Kernel matrix of the output data. tikhonov_reg (float): Tikhonov (ridge) regularization parameter. rank (int): Rank of the estimator. svd_solver (Literal[ "arnoldi", "full" ], optional): Solver for the generalized eigenvalue problem. Defaults to "arnoldi". Shape: ``kernel_X``: :math:`(N, N)`, where :math:`N` is the number of training data. ``kernel_Y``: :math:`(N, N)`. """ # Number of data points npts = kernel_X.shape[0] eps = 1000.0 * np.finfo(kernel_X.dtype).eps penalty = max(eps, tikhonov_reg) * npts A = (kernel_Y / sqrt(npts)) @ (kernel_X / sqrt(npts)) add_diagonal_(kernel_X, penalty) # Find U via Generalized eigenvalue problem equivalent to the SVD. If K is ill-conditioned might be slow. # Prefer svd_solver == 'randomized' in such a case. if svd_solver == "arnoldi": # Adding a small buffer to the Arnoldi-computed eigenvalues. num_arnoldi_eigs = min(rank + 5, npts) values, vectors = eigs(A, k=num_arnoldi_eigs, M=kernel_X) elif svd_solver == "full": # 'full' values, vectors = scipy.linalg.eig(A, kernel_X, overwrite_a=True, overwrite_b=True) else: raise ValueError(f"Unknown svd_solver: {svd_solver}") # Remove the penalty from kernel_X (inplace) add_diagonal_(kernel_X, -penalty) values, stable_values_idxs = stable_topk(values, rank, ignore_warnings=False) vectors = vectors[:, stable_values_idxs] # Compare the filtered eigenvalues with the regularization strength, and warn if there are any eigenvalues that are smaller than the regularization strength. if not np.all(np.abs(values) >= tikhonov_reg): warn( f"Warning: {(np.abs(values) < tikhonov_reg).sum()} out of the {len(values)} squared singular values are smaller than the regularization strength {tikhonov_reg:.2e}. Consider redudcing the regularization strength to avoid overfitting." ) # Eigenvector normalization kernel_X_vecs = np.dot(kernel_X / sqrt(npts), vectors) vecs_norm = np.sqrt( np.sum( kernel_X_vecs**2 + tikhonov_reg * kernel_X_vecs * vectors * sqrt(npts), axis=0, ) ) norm_rcond = 1000.0 * np.finfo(values.dtype).eps values, stable_values_idxs = stable_topk(vecs_norm, rank, rcond=norm_rcond) U = vectors[:, stable_values_idxs] / vecs_norm[stable_values_idxs] # Ordering the results V = kernel_X @ U svals = np.sqrt(np.abs(values)) result: FitResult = {"U": U.real, "V": V.real, "svals": svals} return result
[docs] def nystroem_reduced_rank( kernel_X: ndarray, # Kernel matrix of the input inducing points kernel_Y: ndarray, # Kernel matrix of the output inducing points kernel_Xnys: ndarray, # Kernel matrix between the input data and the input inducing points kernel_Ynys: ndarray, # Kernel matrix between the output data and the output inducing points tikhonov_reg: float, # Tikhonov (ridge) regularization parameter rank: int, # Rank of the estimator svd_solver: Literal["arnoldi", "full"] = "arnoldi", ) -> FitResult: """Fits the Nyström Reduced Rank estimator from :footcite:t:`Meanti2023`. Args: kernel_X (ndarray): Kernel matrix of the input inducing points. kernel_Y (ndarray): Kernel matrix of the output inducing points. kernel_Xnys (ndarray): Kernel matrix between the input data and the input inducing points. kernel_Ynys (ndarray): Kernel matrix between the output data and the output inducing points. tikhonov_reg (float): Tikhonov (ridge) regularization parameter. rank (int): Rank of the estimator. svd_solver (Literal[ "arnoldi", "full" ], optional): Solver for the generalized eigenvalue problem. Defaults to "arnoldi". Shape: ``kernel_X``: :math:`(N, N)`, where :math:`N` is the number of training data. ``kernel_Y``: :math:`(N, N)`. ``kernel_Xnys``: :math:`(N, M)`, where :math:`M` is the number of Nystroem centers (inducing points). ``kernel_Ynys``: :math:`(N, M)`. """ num_points = kernel_Xnys.shape[0] num_centers = kernel_X.shape[0] eps = 1000 * np.finfo(kernel_X.dtype).eps * num_centers reg = max(eps, tikhonov_reg) # LHS of the generalized eigenvalue problem sqrt_Mn = sqrt(num_centers * num_points) kernel_YX_nys = (kernel_Ynys.T / sqrt_Mn) @ (kernel_Xnys / sqrt_Mn) _tmp_YX = scipy.linalg.lstsq(kernel_Y * (num_centers**-1), kernel_YX_nys)[0] kernel_XYX = kernel_YX_nys.T @ _tmp_YX # RHS of the generalized eigenvalue problem kernel_Xnys_sq = (kernel_Xnys.T / sqrt_Mn) @ (kernel_Xnys / sqrt_Mn) + reg * kernel_X * ( num_centers**-1 ) add_diagonal_(kernel_Xnys_sq, eps) A = scipy.linalg.lstsq(kernel_Xnys_sq, kernel_XYX)[0] if svd_solver == "full": values, vectors = scipy.linalg.eigh( kernel_XYX, kernel_Xnys_sq ) # normalization leads to needing to invert evals elif svd_solver == "arnoldi": _oversampling = max(10, 4 * int(np.sqrt(rank))) _num_arnoldi_eigs = min(rank + _oversampling, num_centers) values, vectors = eigs(kernel_XYX, k=_num_arnoldi_eigs, M=kernel_Xnys_sq) else: raise ValueError(f"Unknown svd_solver {svd_solver}") add_diagonal_(kernel_Xnys_sq, -eps) values, stable_values_idxs = stable_topk(values, rank, ignore_warnings=False) vectors = vectors[:, stable_values_idxs] # Compare the filtered eigenvalues with the regularization strength, and warn if there are any eigenvalues that are smaller than the regularization strength. if not np.all(np.abs(values) >= tikhonov_reg): warn( f"Warning: {(np.abs(values) < tikhonov_reg).sum()} out of the {len(values)} squared singular values are smaller than the regularization strength {tikhonov_reg:.2e}. Consider redudcing the regularization strength to avoid overfitting." ) # Eigenvector normalization vecs_norm = np.sqrt(np.abs(np.sum(vectors.conj() * (kernel_XYX @ vectors), axis=0))) norm_rcond = 1000.0 * np.finfo(values.dtype).eps values, stable_values_idxs = stable_topk(vecs_norm, rank, rcond=norm_rcond) vectors = vectors[:, stable_values_idxs] / vecs_norm[stable_values_idxs] U = A @ vectors V = _tmp_YX @ vectors svals = np.sqrt(np.abs(values)) result: FitResult = {"U": U.real, "V": V.real, "svals": svals} return result
[docs] def rand_reduced_rank( kernel_X: ndarray, kernel_Y: ndarray, tikhonov_reg: float, rank: int, n_oversamples: int = 5, optimal_sketching: bool = False, iterated_power: int = 1, rng_seed: int | None = None, precomputed_cholesky=None, ) -> FitResult: """Fits the Randomized Reduced Rank Estimator from :footcite:t:`Turri2023`. Args: kernel_X (ndarray): Kernel matrix of the input data kernel_Y (ndarray): Kernel matrix of the output data tikhonov_reg (float): Tikhonov (ridge) regularization parameter rank (int): Rank of the estimator n_oversamples (int, optional): Number of Oversamples. Defaults to 5. optimal_sketching (bool, optional): Whether to use optimal sketching (slower but more accurate) or not.. Defaults to False. iterated_power (int, optional): Number of iterations of the power method. Defaults to 1. rng_seed (int | None, optional): Random Number Generators seed. Defaults to None. precomputed_cholesky (optional): Precomputed Cholesky decomposition. Should be the output of cho_factor evaluated on the regularized kernel matrix.. Defaults to None. Shape: ``kernel_X``: :math:`(N, N)`, where :math:`N` is the number of training data. ``kernel_Y``: :math:`(N, N)`. """ rng = np.random.default_rng(rng_seed) npts = kernel_X.shape[0] penalty = npts * tikhonov_reg add_diagonal_(kernel_X, penalty) if precomputed_cholesky is None: cholesky_decomposition = scipy.linalg.cho_factor(kernel_X) else: cholesky_decomposition = precomputed_cholesky add_diagonal_(kernel_X, -penalty) sketch_dimension = rank + n_oversamples if optimal_sketching: cov = kernel_Y / npts sketch = rng.multivariate_normal( np.zeros(npts, dtype=kernel_Y.dtype), cov, size=sketch_dimension ).T else: sketch = rng.standard_normal(size=(npts, sketch_dimension)) for _ in range(iterated_power): # Powered randomized rangefinder sketch = (kernel_Y / npts) @ ( sketch - penalty * scipy.linalg.cho_solve(cholesky_decomposition, sketch) ) sketch, _ = scipy.linalg.qr(sketch, mode="economic") # QR re-orthogonalization kernel_X_sketch = scipy.linalg.cho_solve(cholesky_decomposition, sketch) _M = sketch - penalty * kernel_X_sketch F_0 = sketch.T @ sketch - penalty * (sketch.T @ kernel_X_sketch) # Symmetric F_0 = 0.5 * (F_0 + F_0.T) F_1 = _M.T @ ((kernel_Y @ _M) / npts) values, vectors = scipy.linalg.eig(scipy.linalg.lstsq(F_0, F_1)[0]) values, stable_values_idxs = stable_topk(values, rank, ignore_warnings=False) vectors = vectors[:, stable_values_idxs] # Remove elements in the kernel of F_0 relative_norm_sq = np.abs( np.sum(vectors.conj() * (F_0 @ vectors), axis=0) / np.linalg.norm(vectors, axis=0) ** 2 ) norm_rcond = 1000.0 * np.finfo(values.dtype).eps values, stable_values_idxs = stable_topk(relative_norm_sq, rank, rcond=norm_rcond) vectors = vectors[:, stable_values_idxs] vecs_norms = (np.sum(vectors.conj() * (F_0 @ vectors), axis=0).real) ** 0.5 vectors = vectors / vecs_norms U = sqrt(npts) * kernel_X_sketch @ vectors V = sqrt(npts) * _M @ vectors svals = np.sqrt(values) result: FitResult = {"U": U, "V": V, "svals": svals} return result