Source code for bask.bayesgpr

from collections.abc import Iterable
from contextlib import contextmanager, nullcontext

import emcee as mc
import numpy as np
import scipy.stats as st
from scipy.linalg import cho_solve, cholesky, solve_triangular
from sklearn.utils import check_random_state
from skopt.learning import GaussianProcessRegressor
from skopt.learning.gaussian_process.gpr import _param_for_white_kernel_in_Sum
from skopt.learning.gaussian_process.kernels import WhiteKernel

from .utils import geometric_median, guess_priors, validate_zeroone

__all__ = ["BayesGPR"]


[docs]class BayesGPR(GaussianProcessRegressor): """ Gaussian process regressor of which the kernel hyperparameters are inferred in a fully Bayesian framework. The implementation is based on Algorithm 2.1 of Gaussian Processes for Machine Learning (GPML) by Rasmussen and Williams. In addition to standard scikit-learn estimator API, GaussianProcessRegressor: * allows prediction without prior fitting (based on the GP prior); * provides an additional method sample_y(X), which evaluates samples drawn from the GPR (prior or posterior or hyper-posterior) at given inputs; * exposes a method log_marginal_likelihood(theta), which can be used externally for other ways of selecting hyperparameters, e.g., via Markov chain Monte Carlo. * allows setting the kernel hyperparameters while correctly recalculating the required matrices * exposes a method noise_set_to_zero() which can be used as a context manager to temporarily set the prediction noise to zero. This is useful for evaluating acquisition functions for Bayesian optimization Parameters ---------- kernel : kernel object The kernel specifying the covariance function of the GP. If None is passed, the kernel "1.0 * RBF(1.0)" is used as default. Note that the kernel's hyperparameters are set to the geometric median of the Markov chain Monte Carlo samples of the posterior. alpha : float or array-like, optional (default: 1e-10) Value added to the diagonal of the kernel matrix during fitting. Larger values correspond to increased noise level in the observations. This can also prevent a potential numerical issue during fitting, by ensuring that the calculated values form a positive definite matrix. If an array is passed, it must have the same number of entries as the data used for fitting and is used as datapoint-dependent noise level. Note that this is equivalent to adding a WhiteKernel with c=alpha. Allowing to specify the noise level directly as a parameter is mainly for convenience and for consistency with Ridge. Also note, that this class adds a WhiteKernel automatically if noise is set. optimizer : string or callable, optional (default: "fmin_l_bfgs_b") Can either be one of the internally supported optimizers for optimizing the kernel's parameters, specified by a string, or an externally defined optimizer passed as a callable. If a callable is passed, it must have the signature:: def optimizer(obj_func, initial_theta, bounds): # * 'obj_func' is the objective function to be minimized, which # takes the hyperparameters theta as parameter and an # optional flag eval_gradient, which determines if the # gradient is returned additionally to the function value # * 'initial_theta': the initial value for theta, which can be # used by local optimizers # * 'bounds': the bounds on the values of theta .... # Returned are the best found hyperparameters theta and # the corresponding value of the target function. return theta_opt, func_min Per default, the 'fmin_l_bfgs_b' algorithm from scipy.optimize is used. If None is passed, the kernel's parameters are kept fixed. Available internal optimizers are:: 'fmin_l_bfgs_b' Note, that the kernel hyperparameters obtained are only used as the initial position of the Markov chain and will be discarded afterwards. n_restarts_optimizer : int, optional (default: 0) The number of restarts of the optimizer for finding the kernel's parameters which maximize the log-marginal likelihood. The first run of the optimizer is performed from the kernel's initial parameters, the remaining ones (if any) from thetas sampled log-uniform randomly from the space of allowed theta-values. If greater than 0, all bounds must be finite. Note that n_restarts_optimizer == 0 implies that one run is performed. normalize_y : boolean, optional (default: False) Whether the target values y are normalized, i.e., the mean of the observed target values become zero. This parameter should be set to True if the target values' mean is expected to differ considerable from zero. When enabled, the normalization effectively modifies the GP's prior based on the data, which contradicts the likelihood principle. warp_inputs : boolean, optional (default: False) If True, each input dimension will be warped (internally) using the cumulative distribution function of a beta distribution [1]_. The parameters of each beta distribution will be inferred from the data. The input data needs to be in [0, 1]. copy_X_train : bool, optional (default: True) If True, a persistent copy of the training data is stored in the object. Otherwise, just a reference to the training data is stored, which might cause predictions to change if the data is modified externally. random_state : int, RandomState instance or None, optional (default: None) The generator used to initialize the centers. If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. noise : string, optional (default: "gaussian") If set to "gaussian", then it is assumed that `y` is a noisy estimate of `f(x)` where the noise is gaussian. A WhiteKernel will be added to the provided kernel. Attributes ---------- X_train_ : array-like, shape = (n_samples, n_features) Feature values in training data (also required for prediction) y_train_ : array-like, shape = (n_samples, [n_output_dims]) Target values in training data (also required for prediction) kernel_ : kernel object The kernel used for prediction. The structure of the kernel is the same as the one passed as parameter but with optimized hyperparameters L_ : array-like, shape = (n_samples, n_samples) Lower-triangular Cholesky decomposition of the kernel in ``X_train_`` alpha_ : array-like, shape = (n_samples,) Dual coefficients of training data points in kernel space log_marginal_likelihood_value_ : float The log-marginal-likelihood of ``self.kernel_.theta`` noise_ : float Estimate of the gaussian noise. Useful only when noise is set to "gaussian". chain_ : array-like, shape = (n_desired_samples, n_hyperparameters) Samples from the posterior distribution of the hyperparameters. pos_ : array-like, shape = (n_walkers, n_hyperparameters) Last position of the Markov chain. Useful for continuing sampling when new datapoints arrive. fit(X, y) internally uses an existing pos_ to resume sampling, if no other position is provided. References ---------- .. [1] Snoek, Jasper, Kevin Swersky, Richard Zemel, and Ryan P. Adams. “Input Warping for Bayesian Optimization of Non-Stationary Functions.” In Proceedings of the 31st International Conference on International Conference on Machine Learning - Volume 32, II–1674–II–1682. ICML’14. Beijing, China: JMLR.org, 2014. """
[docs] def __init__( self, kernel=None, alpha=1e-10, optimizer="fmin_l_bfgs_b", n_restarts_optimizer=0, normalize_y=False, warp_inputs=False, copy_X_train=True, random_state=None, noise="gaussian", ): if kernel is None: self._kernel = None else: self._kernel = kernel.clone_with_theta(kernel.theta) random_state = check_random_state(random_state) super().__init__( kernel, alpha, optimizer, n_restarts_optimizer, normalize_y, copy_X_train, random_state, noise, ) self._alpha = self.alpha self.warp_inputs = warp_inputs self._sampler = None self.chain_ = None self.pos_ = None self.kernel_ = None
@property def theta(self): """The current geometric median of the kernel hyperparameter distribution. The returned values are located in log space. Call `BayesGPR.kernel_` to obtain the values their original space. Returns ------- ndarray Array containing the kernel hyperparameters in log space. """ if self.kernel_ is not None: with np.errstate(divide="ignore"): return np.copy(self.kernel_.theta) return None @theta.setter def theta(self, theta): self.kernel_.theta = theta K = self.kernel_(self.X_train_) K[np.diag_indices_from(K)] += self.alpha try: self.L_ = cholesky(K, lower=True) L_inv = solve_triangular(self.L_.T, np.eye(self.L_.shape[0])) self.K_inv_ = L_inv.dot(L_inv.T) except np.linalg.LinAlgError as exc: exc.args = ( "The kernel, %s, is not returning a " "positive definite matrix. Try gradually " "increasing the 'alpha' parameter of your " "GaussianProcessRegressor estimator." % self.kernel_, ) + exc.args raise self.alpha_ = cho_solve((self.L_, True), self.y_train_) @property def X_train_(self): """ The training data which was used to train the Gaussian process. If input warping is used, it will return the warped instances. Returns ------- array-like, shape = (n_samples, n_features) Feature values in training data (also required for prediction). If `warp_inputs=True`, will contain the warped inputs in [0, 1]. """ if hasattr(self, "_X_train_orig_"): if self.warp_inputs: return self._X_train_warped_ return self._X_train_orig_ return None @X_train_.setter def X_train_(self, X_train): self._X_train_orig_ = np.copy(X_train) if self.copy_X_train else X_train if self.warp_inputs: self._X_train_warped_ = np.copy(self._X_train_orig_) if hasattr(self, "warpers_"): for col, warper in enumerate(self.warpers_): self._X_train_warped_[:, col] = warper(self._X_train_orig_[:, col]) # If no warpers exist yet, we begin with an unwarped input space
[docs] def warp(self, X): """Warp the input X using the existing warpers. Returns X if `warp_inputs=False` or if no warpers have been fit yet. Parameters ---------- X : ndarray, shape (n_points, n_dims) Points in the original space which should be warped. """ if self.warp_inputs and hasattr(self, "warpers_"): X_warped = np.empty_like(X) for col, warper in enumerate(self.warpers_): X_warped[:, col] = warper(X[:, col]) X = X_warped return X
[docs] def unwarp(self, X): """Unwarp the input X back to the original input space. Returns X if `warp_inputs=False` or if no warpers have been fit yet. Parameters ---------- X : ndarray, shape (n_points, n_dims) Points in the warped space which should be transformed back to the input space. """ if self.warp_inputs and hasattr(self, "warpers_"): X_orig = np.empty_like(X) for col, unwarper in enumerate(self.unwarpers_): X_orig[:, col] = unwarper(X[:, col]) X = X_orig return X
[docs] def rewarp(self): """Apply warping again to X_train_ after parameters have changed. Does nothing if `warp_inputs=False` or if no warpers have been fit yet. """ if self.warp_inputs: if hasattr(self, "warpers_") and hasattr(self, "_X_train_orig_"): self._X_train_warped_ = np.empty_like(self._X_train_orig_) for col, warper in enumerate(self.warpers_): self._X_train_warped_[:, col] = warper(self._X_train_orig_[:, col])
[docs] def create_warpers(self, alphas, betas): """Create Beta CDFs and inverse CDFs for input (un)warping. Parameters ---------- alphas : ndarray, shape (n_dims) Raw alpha parameters of the Beta distributions in log-space. betas : ndarray, shape (n_dims) Raw beta parameters of the Beta distributions in log-space. """ if self.warp_inputs: self.warpers_ = [] self.unwarpers_ = [] self.warp_alphas_ = np.copy(alphas) self.warp_betas_ = np.copy(betas) for a_log, b_log in zip(alphas, betas): a, b = np.exp(a_log), np.exp(b_log) dist = st.beta(a=a, b=b) self.warpers_.append(dist.cdf) self.unwarpers_.append(dist.ppf)
[docs] @contextmanager def noise_set_to_zero(self): """Context manager in which the noise of the Gaussian process is 0. This is useful when you want to predict the epistemic uncertainty of the Gaussian process without the noise. """ current_theta = self.theta try: # Now we set the noise to 0, but do NOT recalculate the alphas!: white_present, white_param = _param_for_white_kernel_in_Sum(self.kernel_) self.kernel_.set_params(**{white_param: WhiteKernel(noise_level=0.0)}) yield self finally: self.kernel_.theta = current_theta
def _apply_noise_vector(self, n_instances, noise_vector): # We apply the noise vector to self.alpha here, to avoid having to pull up # inherited code: if noise_vector is not None: if not np.iterable(self.alpha): alpha = np.ones(n_instances) * self.alpha elif not np.iterable(self._alpha): # we already changed self.alpha before alpha = np.ones(n_instances) * self._alpha alpha[: len(noise_vector)] += noise_vector self.alpha = alpha def _log_prob_fn(self, x, priors, warp_priors): lp = 0 if self.warp_inputs: n_dim = self.X_train_.shape[1] x_warp = x[-2 * n_dim :] x_gp = x[: len(x) - 2 * n_dim] alphas, betas = x_warp[:n_dim], x_warp[n_dim:] self.create_warpers(alphas, betas) self.rewarp() for a_log, b_log in zip(alphas, betas): if isinstance(warp_priors, Iterable): lp += warp_priors[0](a_log) lp += warp_priors[1](b_log) else: lp += warp_priors(a_log, b_log) else: x_gp = x if isinstance(priors, Iterable): for prior, val in zip(priors, x_gp): lp += prior(val) else: # Assume priors is a callable, which evaluates the log probability: lp += priors(x_gp) try: lp = lp + self.log_marginal_likelihood(theta=x_gp) except ValueError: return -np.inf if not np.isfinite(lp): return -np.inf return lp
[docs] def sample( self, X=None, y=None, noise_vector=None, n_threads=1, n_desired_samples=100, n_burnin=0, n_thin=1, n_walkers_per_thread=100, progress=False, priors=None, warp_priors=None, position=None, add=False, **kwargs ): """Sample from the posterior distribution of the hyper-parameters. Parameters ---------- X : ndarray, shape (n_points, n_dims), optional (default: None) Points at which the function is evaluated. If None, it will use the saved datapoints. y : ndarray, shape (n_points,), optional (default: None) Value(s) of the function at `X`. If None, it will use the saved values. noise_vector : Variance(s) of the function at `X`. If None, no additional noise is applied. n_threads : int, optional (default: 1) Number of threads to use during inference. This is currently not implemented. n_desired_samples : int, optional (default: 100) Number of hyperposterior samples to collect during inference. Must be a multiple of `n_walkers_per_thread`. n_burnin : int, optional (default: 0) Number of iterations to discard before collecting hyperposterior samples. Needs to be increased only, if the hyperposterior samples have not reached their typical set yet. Higher values increase the running time. n_thin : int, optional (default: 1) Only collect hyperposterior samples every k-th iteration. This can help reducing the autocorrelation of the collected samples, but reduces the total number of samples. n_walkers_per_thread : int, optional (default: 100) Number of MCMC ensemble walkers to employ during inference. progress : bool, optional (default: False) If True, show a progress bar during inference. priors : list or callable, optional (default: None) Log prior(s) for the kernel hyperparameters. Remember that the kernel hyperparameters are transformed into log space. Thus your priors need to perform the necessary change-of-variables. warp_priors : list or callable, optional (default: None) Log prior(s) for the parameters of the Beta distribution used to warp each dimension. Only used, if `warp_inputs=True`. By default uses a log-normal distribution with mean 0 and standard deviation of 0.5 for each parameter of the Beta distribution. This prior favors the identity transformation and sufficient data is needed to shift towards a stronger warping function. position : ndarray, shape (n_walkers, n_kernel_dims), optional (default: None) Starting position of the Markov chain. If None, it will use the current position. If this is None as well, it will try to initialize in a small ball. add : bool, optional (default: False) If True, all collected hyperposterior samples will be added to the existing samples in `BayesGPR.chain_`. Otherwise they will be replaced. kwargs : dict Additional keyword arguments for emcee.EnsembleSampler """ if X is None and not hasattr(self, "X_train_") or self.kernel_ is None: raise ValueError( """ It looks like you are trying to sample from the GP posterior without data. Pass X and y, or ensure that you call fit before sample. """ ) # We are only able to guess priors now, since BayesGPR can add # another WhiteKernel, when noise is set to "gaussian": if priors is None: priors = guess_priors(self.kernel_) if warp_priors is None: warp_priors = ( st.norm(loc=0.0, scale=0.3).logpdf, st.norm(loc=0.0, scale=0.3).logpdf, ) # Update data, if available: if X is not None: if self.normalize_y: self._y_train_mean = np.mean(y, axis=0) self._y_train_std = np.std(y, axis=0) else: self._y_train_mean = np.zeros(1) self._y_train_std = 1 self.y_train_std_ = self._y_train_std self.y_train_mean_ = self._y_train_mean y = (y - self.y_train_mean_) / self.y_train_std_ if noise_vector is not None: noise_vector = np.array(noise_vector) / np.power(self.y_train_std_, 2) self.X_train_ = np.copy(X) if self.copy_X_train else X self.y_train_ = np.copy(y) if self.copy_X_train else y self._apply_noise_vector(len(self.y_train_), noise_vector) n_dim = len(self.theta) n_walkers = n_threads * n_walkers_per_thread n_samples = int(np.ceil(n_desired_samples / n_walkers) + n_burnin) pos = None if position is not None: pos = position elif self.pos_ is not None: pos = self.pos_ if self.warp_inputs: added_dims = self.X_train_.shape[1] * 2 n_dim += added_dims if pos is None: theta = self.theta theta[np.isinf(theta)] = np.log(self.noise_) if self.warp_inputs: theta = np.concatenate([theta, np.zeros(added_dims)]) pos = [ theta + 1e-2 * self.random_state.randn(n_dim) for _ in range(n_walkers) ] self._sampler = mc.EnsembleSampler( nwalkers=n_walkers, ndim=n_dim, log_prob_fn=self._log_prob_fn, kwargs=dict(priors=priors, warp_priors=warp_priors), threads=n_threads, **kwargs ) rng = np.random.RandomState( self.random_state.randint(0, np.iinfo(np.int32).max) ) self._sampler.random_state = rng.get_state() pos, prob, state = self._sampler.run_mcmc(pos, n_samples, progress=progress) # if backup_file is not None: # with open(backup_file, "wb") as f: # np.save(f, pos) chain = self._sampler.get_chain(flat=True, discard=n_burnin, thin=n_thin) if add and self.chain_ is not None: self.chain_ = np.concatenate([self.chain_, chain]) else: self.chain_ = chain if self.warp_inputs: median = geometric_median(self.chain_) warp_params = median[len(self.theta) :] alphas = warp_params[: self.X_train_.shape[1]] betas = warp_params[self.X_train_.shape[1] :] self.create_warpers(alphas, betas) self.rewarp() self.theta = median[: len(self.theta)] else: self.theta = geometric_median(self.chain_) self.log_marginal_likelihood_value_ = self.log_marginal_likelihood( self.kernel_.theta, clone_kernel=False ) self.pos_ = pos
[docs] def fit( self, X, y, noise_vector=None, n_threads=1, n_desired_samples=100, n_burnin=10, n_walkers_per_thread=100, progress=True, priors=None, warp_priors=None, position=None, **kwargs ): """Fit the Gaussian process model to the given training data. Parameters ---------- X : ndarray, shape (n_points, n_dims) Points at which the function is evaluated. If None, it will use the saved datapoints. y : ndarray, shape (n_points,) Value(s) of the function at `X`. If None, it will use the saved values. noise_vector : Variance(s) of the function at `X`. If None, no additional noise is applied. n_threads : int, optional (default: 1) Number of threads to use during inference. This is currently not implemented. n_desired_samples : int, optional (default: 100) Number of hyperposterior samples to collect during inference. Must be a multiple of `n_walkers_per_thread`. n_burnin : int, optional (default: 0) Number of iterations to discard before collecting hyperposterior samples. Needs to be increased only, if the hyperposterior samples have not reached their typical set yet. Higher values increase the running time. n_walkers_per_thread : int, optional (default: 100) Number of MCMC ensemble walkers to employ during inference. progress : bool, optional (default: False) If True, show a progress bar during inference. priors : list or callable, optional (default: None) Log prior(s) for the kernel hyperparameters. Remember that the kernel hyperparameters are transformed into log space. Thus your priors need to perform the necessary change-of-variables. position : ndarray, shape (n_walkers, n_kernel_dims), optional (default: None) Starting position of the Markov chain. If None, it will use the current position. If this is None as well, it will try to initialize in a small ball. kwargs : dict Additional keyword arguments for BayesGPR.sample """ self.kernel = self._kernel if self.normalize_y and noise_vector is not None: y_std = np.std(y, axis=0) noise_vector = np.array(noise_vector) / np.power(y_std, 2) self._apply_noise_vector(len(y), noise_vector) super().fit(X, y) self.sample( n_threads=n_threads, n_desired_samples=n_desired_samples, n_burnin=n_burnin, n_walkers_per_thread=n_walkers_per_thread, progress=progress, priors=priors, warp_priors=warp_priors, position=position, add=False, **kwargs )
[docs] def predict( self, X, return_std=False, return_cov=False, return_mean_grad=False, return_std_grad=False, ): if self.warp_inputs: validate_zeroone(X) X = self.warp(X) return super().predict( X, return_std, return_cov, return_mean_grad, return_std_grad )
[docs] def sample_y(self, X, sample_mean=False, noise=False, n_samples=1, random_state=0): """Sample function realizations of the Gaussian process. Parameters ---------- X : ndarray, shape (n_points, n_dims) Points at which to evaluate the functions. sample_mean : bool, optional (default: False) If True, the geometric median of the hyperposterior samples is used as the Gaussian process to sample from. If False, a new set of hyperposterior is used for each new sample. noise : bool, optional (default: False) If True, Gaussian noise is added to the samples. n_samples : int, optional (default: 1) Number of samples to draw from the Gaussian process(es). random_state : int or RandomState or None, optional, default=None Pseudo random number generator state used for random uniform sampling from lists of possible values instead of scipy.stats distributions. Returns ------- result : ndarray, shape (n_points, n_samples) Samples from the Gaussian process(es) Raises ------ ValueError If `warp_inputs=True` and the entries of X are not all between 0 and 1. """ rng = check_random_state(random_state) if sample_mean: if noise: cm = nullcontext(self) else: cm = self.noise_set_to_zero() with cm: samples = super().sample_y(X, n_samples=n_samples, random_state=rng) return samples ind = rng.choice(len(self.chain_), size=n_samples, replace=True) if self.warp_inputs: current_warp_alphas = np.copy(self.warp_alphas_) current_warp_betas = np.copy(self.warp_betas_) current_theta = self.theta n_dims = len(current_theta) current_K_inv = np.copy(self.K_inv_) current_L = np.copy(self.L_) current_alpha = np.copy(self.alpha_) result = np.empty((X.shape[0], n_samples)) for i, j in enumerate(ind): if self.warp_inputs: validate_zeroone(X) theta = self.chain_[j][:n_dims] warp_params = self.chain_[j][n_dims:] alphas, betas = warp_params[: X.shape[1]], warp_params[X.shape[1] :] self.create_warpers(alphas, betas) self.rewarp() else: theta = self.chain_[j] self.theta = theta if noise: cm = nullcontext(self) else: cm = self.noise_set_to_zero() with cm: result[:, i] = ( super().sample_y(X, n_samples=1, random_state=rng).flatten() ) self.kernel_.theta = current_theta self.K_inv_ = current_K_inv self.alpha_ = current_alpha if self.warp_inputs: self.warp_alphas_ = current_warp_alphas self.warp_betas_ = current_warp_betas self.L_ = current_L return result