Source code for bayesflow.computational_utilities

# Copyright (c) 2022 The BayesFlow Developers

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import numpy as np
import tensorflow as tf
from scipy import stats
from sklearn.calibration import calibration_curve
from sklearn.model_selection import KFold, cross_val_score
from sklearn.neural_network import MLPClassifier

from bayesflow.default_settings import MMD_BANDWIDTH_LIST
from bayesflow.exceptions import ShapeError


[docs] def posterior_calibration_error( posterior_samples, prior_samples, alpha_resolution=20, aggregator_fun=np.median, min_quantile=0.005, max_quantile=0.995, ): """Computes an aggregate score for the marginal calibration error over an ensemble of approximate posteriors. The calibration error is given as the aggregate (e.g., median) of the absolute deviation between an alpha-CI and the relative number of inliers from ``prior_samples`` over multiple alphas in (0, 1). Note: The function will use posterior quantiles for determining the credibility intervals. An alternative definition of the calibration error is possible using highest density intervals (HDIs). Parameters ---------- posterior_samples : np.ndarray of shape (num_datasets, num_draws, num_params) The random draws from the approximate posteriors over ``num_datasets`` prior_samples : np.ndarray of shape (num_datasets, num_params) The corresponding ground-truth values sampled from the prior alpha_resolution : int, optional, default: 20 The number of credibility intervals (CIs) to consider aggregator_fun : callable or None, optional, default: np.median The function used to aggregate the marginal calibration errors. If ``None`` provided, the per-alpha calibration errors will be returned. min_quantile : float in (0, 1), optional, default: 0.005 The minimum posterior quantile to consider max_quantile : float in (0, 1), optional, default: 0.995 The maximum posterior quantile to consider Returns ------- calibration_errors : np.ndarray of shape (num_params, ) or (alpha_resolution, num_params), if ``aggregator_fun is None``. The aggregated calibration error per marginal posterior. """ num_params = prior_samples.shape[1] alphas = np.linspace(min_quantile, max_quantile, alpha_resolution) absolute_errors = np.zeros((alpha_resolution, num_params)) for i, alpha in enumerate(alphas): # Find lower and upper bounds of posterior distribution region = 1 - alpha lower = region / 2 upper = 1 - (region / 2) # Compute percentiles for given alpha using the entire posterior sample quantiles = np.quantile(posterior_samples, [lower, upper], axis=1) # Compute the relative number of inliers higher_mask = quantiles[0] <= prior_samples lower_mask = prior_samples <= quantiles[1] inlier_id = np.logical_and(higher_mask, lower_mask) alpha_pred = np.mean(inlier_id, axis=0) absolute_errors[i] = np.abs(alpha_pred - alpha) if aggregator_fun is not None: calibration_errors = aggregator_fun(absolute_errors, axis=0) return calibration_errors return absolute_errors
[docs] def compute_jacobian_trace(function, inputs, **kwargs): """Computes the exact Jacobian Trace of function with respect to inputs. Suitable for low dimensions (<32) Parameters ---------- function : callable The function whose Jacobian at inputs will be computed inputs : tf.Tensor of shape (batch_size, ...) The tensor with respect to which we are computing the Jacobian Returns ------- outputs : tf.Tensor of shape (batch_size, ...) The outputs of ``function`` trace : tf.Tensor of shape (batch_size, ...) The trace of the Jacobian at inputs. """ if len(inputs.shape) == 2: batch_size, dims = inputs.shape trace = tf.zeros((batch_size,)) else: batch_size, num_reps, dims = inputs.shape trace = tf.zeros((batch_size, num_reps)) with tf.GradientTape(persistent=True) as tape: tape.watch(inputs) outputs = function(inputs, **kwargs) for step in range(dims): dummy = tf.cast(step * tf.ones(trace.shape), tf.int32) epsilon = tf.one_hot(dummy, dims) vjp = tape.gradient(outputs, inputs, output_gradients=epsilon) trace = trace + tf.reduce_sum(vjp * epsilon, axis=-1) return outputs, trace
[docs] def gaussian_kernel_matrix(x, y, sigmas=None): """Computes a Gaussian radial basis functions (RBFs) between the samples of x and y. We create a sum of multiple Gaussian kernels each having a width :math:`\sigma_i`. Parameters ---------- x : tf.Tensor of shape (num_draws_x, num_features) Comprises `num_draws_x` Random draws from the "source" distribution `P`. y : tf.Tensor of shape (num_draws_y, num_features) Comprises `num_draws_y` Random draws from the "source" distribution `Q`. sigmas : list(float), optional, default: None List which denotes the widths of each of the gaussians in the kernel. If `sigmas is None`, a default range will be used, contained in ``bayesflow.default_settings.MMD_BANDWIDTH_LIST`` Returns ------- kernel : tf.Tensor of shape (num_draws_x, num_draws_y) The kernel matrix between pairs from `x` and `y`. """ if sigmas is None: sigmas = MMD_BANDWIDTH_LIST norm = lambda v: tf.reduce_sum(tf.square(v), 1) beta = 1.0 / (2.0 * (tf.expand_dims(sigmas, 1))) dist = tf.transpose(norm(tf.expand_dims(x, 2) - tf.transpose(y))) s = tf.matmul(beta, tf.reshape(dist, (1, -1))) kernel = tf.reshape(tf.reduce_sum(tf.exp(-s), 0), tf.shape(dist)) return kernel
[docs] def inverse_multiquadratic_kernel_matrix(x, y, sigmas=None): """Computes an inverse multiquadratic RBF between the samples of x and y. We create a sum of multiple IM-RBF kernels each having a width :math:`\sigma_i`. Parameters ---------- x : tf.Tensor of shape (num_draws_x, num_features) Comprises `num_draws_x` Random draws from the "source" distribution `P`. y : tf.Tensor of shape (num_draws_y, num_features) Comprises `num_draws_y` Random draws from the "source" distribution `Q`. sigmas : list(float), optional, default: None List which denotes the widths of each of the gaussians in the kernel. If `sigmas is None`, a default range will be used, contained in `bayesflow.default_settings.MMD_BANDWIDTH_LIST` Returns ------- kernel : tf.Tensor of shape (num_draws_x, num_draws_y) The kernel matrix between pairs from `x` and `y`. """ if sigmas is None: sigmas = MMD_BANDWIDTH_LIST dist = tf.expand_dims(tf.reduce_sum((x[:, None, :] - y[None, :, :]) ** 2, axis=-1), axis=-1) sigmas = tf.expand_dims(sigmas, 0) return tf.reduce_sum(sigmas / (dist + sigmas), axis=-1)
[docs] def mmd_kernel(x, y, kernel): """Computes the estimator of the Maximum Mean Discrepancy (MMD) between two samples: x and y. Maximum Mean Discrepancy (MMD) is a distance-measure between random draws from the distributions `x ~ P` and `y ~ Q`. Parameters ---------- x : tf.Tensor of shape (N, num_features) An array of `N` random draws from the "source" distribution `x ~ P`. y : tf.Tensor of shape (M, num_features) An array of `M` random draws from the "target" distribution `y ~ Q`. kernel : callable A function which computes the distance between pairs of samples. Returns ------- loss : tf.Tensor of shape (,) The statistically biased squared maximum mean discrepancy (MMD) value. """ loss = tf.reduce_mean(kernel(x, x)) loss += tf.reduce_mean(kernel(y, y)) loss -= 2 * tf.reduce_mean(kernel(x, y)) return loss
[docs] def mmd_kernel_unbiased(x, y, kernel): """Computes the unbiased estimator of the Maximum Mean Discrepancy (MMD) between two samples: x and y. Maximum Mean Discrepancy (MMD) is a distance-measure between the samples of the distributions `x ~ P` and `y ~ Q`. Parameters ---------- x : tf.Tensor of shape (N, num_features) An array of `N` random draws from the "source" distribution `x ~ P`. y : tf.Tensor of shape (M, num_features) An array of `M` random draws from the "target" distribution `y ~ Q`. kernel : callable A function which computes the distance between pairs of random draws from `x` and `y`. Returns ------- loss : tf.Tensor of shape (,) The statistically unbiased squared maximum mean discrepancy (MMD) value. """ m, n = x.shape[0], y.shape[0] loss = (1.0 / (m * (m + 1))) * tf.reduce_sum(kernel(x, x)) loss += (1.0 / (n * (n + 1))) * tf.reduce_sum(kernel(y, y)) loss -= (2.0 / (m * n)) * tf.reduce_sum(kernel(x, y)) return loss
[docs] def expected_calibration_error(m_true, m_pred, num_bins=10): """Estimates the expected calibration error (ECE) of a model comparison network according to [1]. [1] Naeini, M. P., Cooper, G., & Hauskrecht, M. (2015). Obtaining well calibrated probabilities using bayesian binning. In Proceedings of the AAAI conference on artificial intelligence (Vol. 29, No. 1). Notes ----- Make sure that ``m_true`` are **one-hot encoded** classes! Parameters ---------- m_true : np.ndarray of shape (num_sim, num_models) The one-hot-encoded true model indices. m_pred : tf.tensor of shape (num_sim, num_models) The predicted posterior model probabilities. num_bins : int, optional, default: 10 The number of bins to use for the calibration curves (and marginal histograms). Returns ------- cal_errs : list of length (num_models) The ECEs for each model. probs : list of length (num_models) The bin information for constructing the calibration curves. Each list contains two arrays of length (num_bins) with the predicted and true probabilities for each bin. """ # Convert tf.Tensors to numpy, if passed if type(m_true) is not np.ndarray: m_true = m_true.numpy() if type(m_pred) is not np.ndarray: m_pred = m_pred.numpy() # Extract number of models and prepare containers n_models = m_true.shape[1] cal_errs = [] probs_true = [] probs_pred = [] # Loop for each model and compute calibration errs per bin for k in range(n_models): y_true = (m_true.argmax(axis=1) == k).astype(np.float32) y_prob = m_pred[:, k] prob_true, prob_pred = calibration_curve(y_true, y_prob, n_bins=num_bins) # Compute ECE by weighting bin errors by bin size bins = np.linspace(0.0, 1.0, num_bins + 1) binids = np.searchsorted(bins[1:-1], y_prob) bin_total = np.bincount(binids, minlength=len(bins)) nonzero = bin_total != 0 cal_err = np.sum(np.abs(prob_true - prob_pred) * (bin_total[nonzero] / len(y_true))) cal_errs.append(cal_err) probs_true.append(prob_true) probs_pred.append(prob_pred) return cal_errs, probs_true, probs_pred
[docs] def maximum_mean_discrepancy(source_samples, target_samples, kernel="gaussian", mmd_weight=1.0, minimum=0.0): """Computes the MMD given a particular choice of kernel. For details, consult Gretton et al. (2012): https://www.jmlr.org/papers/volume13/gretton12a/gretton12a.pdf Parameters ---------- source_samples : tf.Tensor of shape (N, num_features) An array of `N` random draws from the "source" distribution. target_samples : tf.Tensor of shape (M, num_features) An array of `M` random draws from the "target" distribution. kernel : str in ('gaussian', 'inverse_multiquadratic'), optional, default: 'gaussian' The kernel to use for computing the distance between pairs of random draws. mmd_weight : float, optional, default: 1.0 The weight of the MMD value. minimum : float, optional, default: 0.0 The lower bound of the MMD value. Returns ------- loss_value : tf.Tensor A scalar Maximum Mean Discrepancy, shape (,) """ # Determine kernel, fall back to Gaussian if unknown string passed if kernel == "gaussian": kernel_fun = gaussian_kernel_matrix elif kernel == "inverse_multiquadratic": kernel_fun = inverse_multiquadratic_kernel_matrix else: kernel_fun = gaussian_kernel_matrix # Compute and return MMD value loss_value = mmd_kernel(source_samples, target_samples, kernel=kernel_fun) loss_value = mmd_weight * tf.maximum(minimum, loss_value) return loss_value
[docs] def get_coverage_probs(z, u): """Vectorized function to compute the minimal coverage probability for uniform ECDFs given evaluation points z and a sample of samples u. Parameters ---------- z : np.ndarray of shape (num_points, ) The vector of evaluation points. u : np.ndarray of shape (num_simulations, num_samples) The matrix of simulated draws (samples) from U(0, 1) """ N = u.shape[1] F_m = np.sum((z[:, np.newaxis] >= u[:, np.newaxis, :]), axis=-1) / u.shape[1] bin1 = stats.binom(N, z).cdf(N * F_m) bin2 = stats.binom(N, z).cdf(N * F_m - 1) gamma = 2 * np.min(np.min(np.stack([bin1, 1 - bin2], axis=-1), axis=-1), axis=-1) return gamma
[docs] def simultaneous_ecdf_bands( num_samples, num_points=None, num_simulations=1000, confidence=0.95, eps=1e-5, max_num_points=1000 ): """Computes the simultaneous ECDF bands through simulation according to the algorithm described in Section 2.2: https://link.springer.com/content/pdf/10.1007/s11222-022-10090-6.pdf Depends on the vectorized utility function `get_coverage_probs(z, u)`. Parameters ---------- num_samples : int The sample size used for computing the ECDF. Will equal to the number of posterior samples when used for calibrarion. Corresponds to `N` in the paper above. num_points : int, optional, default: None The number of evaluation points on the interval (0, 1). Defaults to `num_points = num_samples` if not explicitly specified. Correspond to `K` in the paper above. num_simulations : int, optional, default: 1000 The number of samples of size `n_samples` to simulate for determining the simultaneous CIs. confidence : float in (0, 1), optional, default: 0.95 The confidence level, `confidence = 1 - alpha` specifies the width of the confidence interval. eps : float, optional, default: 1e-5 Small number to add to the lower and subtract from the upper bound of the interval [0, 1] to avoid edge artefacts. No need to touch this. max_num_points : int, optional, default: 1000 Upper bound on `num_points`. Saves computation time when `num_samples` is large. Returns ------- (alpha, z, L, U) - tuple of scalar and three arrays of size (num_samples,) containing the confidence level as well as the evaluation points, the lower, and the upper confidence bands, respectively. """ # Use shorter var names throughout N = num_samples if num_points is None: K = min(N, max_num_points) else: K = min(num_points, max_num_points) M = num_simulations # Specify evaluation points z = np.linspace(0 + eps, 1 - eps, K) # Simulate M samples of size N u = np.random.uniform(size=(M, N)) # Get alpha alpha = 1 - confidence # Compute minimal coverage probabilities gammas = get_coverage_probs(z, u) # Use insights from paper to compute lower and upper confidence interval gamma = np.percentile(gammas, 100 * alpha) L = stats.binom(N, z).ppf(gamma / 2) / N U = stats.binom(N, z).ppf(1 - gamma / 2) / N return alpha, z, L, U
[docs] def mean_squared_error(x_true, x_pred): """Computes the mean squared error between a single true value and M estimates thereof. Parameters ---------- x_true : float or np.ndarray true values, shape () x_pred : np.ndarray predicted values, shape (M, ) Returns ------- out : float The MSE between ``x_true`` and ``x_pred`` """ x_true = np.array(x_true) x_pred = np.array(x_pred) try: return np.mean((x_true[np.newaxis, :] - x_pred) ** 2) except IndexError: return np.mean((x_true - x_pred) ** 2)
[docs] def root_mean_squared_error(x_true, x_pred): """Computes the root mean squared error (RMSE) between a single true value and M estimates thereof. Parameters ---------- x_true : float or np.ndarray true values, shape () x_pred : np.ndarray predicted values, shape (M, ) Returns ------- out : float The RMSE between ``x_true`` and ``x_pred`` """ mse = mean_squared_error(x_true=x_true, x_pred=x_pred) return np.sqrt(mse)
[docs] def aggregated_error(x_true, x_pred, inner_error_fun=root_mean_squared_error, outer_aggregation_fun=np.mean): """Computes the aggregated error between a vector of N true values and M estimates of each true value. x_true : np.ndarray true values, shape (N) x_pred : np.ndarray predicted values, shape (M, N) inner_error_fun: callable, default: root_mean_squared_error computes the error between one true value and M estimates thereof outer_aggregation_fun: callable, default: np.mean aggregates N errors to a single aggregated error value """ x_true, x_pred = np.array(x_true), np.array(x_pred) N = x_pred.shape[0] if not N == x_true.shape[0]: raise ShapeError errors = np.array([inner_error_fun(x_true=x_true[i], x_pred=x_pred[i]) for i in range(N)]) if not N == errors.shape[0]: raise ShapeError return outer_aggregation_fun(errors)
[docs] def aggregated_rmse(x_true, x_pred): """ Computes the aggregated RMSE for a matrix of predictions. Parameters ---------- x_true : np.ndarray true values, shape (N) x_pred : np.ndarray predicted values, shape (M, N) Returns ------- aggregated RMSE """ return aggregated_error( x_true=x_true, x_pred=x_pred, inner_error_fun=root_mean_squared_error, outer_aggregation_fun=np.mean )
[docs] def c2st( source_samples, target_samples, n_folds=5, scoring="accuracy", normalize=True, seed=123, hidden_units_per_dim=16, aggregate_output=True, ): """C2ST metric [1] using an sklearn neural network classifier (i.e., MLP). Code adapted from https://github.com/sbi-benchmark/sbibm/blob/main/sbibm/metrics/c2st.py [1] Lopez-Paz, D., & Oquab, M. (2016). Revisiting classifier two-sample tests. arXiv:1610.06545. Parameters ---------- source_samples : np.ndarray or tf.Tensor Source samples (e.g., approximate posterior samples) target_samples : np.ndarray or tf.Tensor Target samples (e.g., samples from a reference posterior) n_folds : int, optional, default: 5 Number of folds in k-fold cross-validation for the classifier evaluation scoring : str, optional, default: "accuracy" Evaluation score of the sklearn MLP classifier normalize : bool, optional, default: True Whether the data shall be z-standardized relative to source_samples seed : int, optional, default: 123 RNG seed for the MLP and k-fold CV hidden_units_per_dim : int, optional, default: 16 Number of hidden units in the MLP, relative to the input dimensions. Example: source samples are 5D, hidden_units_per_dim=16 -> 80 hidden units per layer aggregate_output : bool, optional, default: True Whether to return a single value aggregated over all cross-validation runs or all values from all runs. If left at default, the empirical mean will be returned Returns ------- c2st_score : float The resulting C2ST score """ x = np.array(source_samples) y = np.array(target_samples) num_dims = x.shape[1] if not num_dims == y.shape[1]: raise ShapeError( f"source_samples and target_samples can have different number of observations (1st dim)" f"but must have the same dimensionality (2nd dim)" f"found: source_samples {source_samples.shape[1]}, target_samples {target_samples.shape[1]}" ) if normalize: x_mean = np.mean(x, axis=0) x_std = np.std(x, axis=0) x = (x - x_mean) / x_std y = (y - x_mean) / x_std clf = MLPClassifier( activation="relu", hidden_layer_sizes=(hidden_units_per_dim * num_dims, hidden_units_per_dim * num_dims), max_iter=10000, solver="adam", random_state=seed, ) data = np.concatenate((x, y)) target = np.concatenate( ( np.zeros((x.shape[0],)), np.ones((y.shape[0],)), ) ) shuffle = KFold(n_splits=n_folds, shuffle=True, random_state=seed) scores = cross_val_score(clf, data, target, cv=shuffle, scoring=scoring) if aggregate_output: c2st_score = np.asarray(np.mean(scores)).astype(np.float32) return c2st_score