Source code for bayesflow.helper_networks

# Copyright (c) 2022 The BayesFlow Developers

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from functools import partial

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Conv1D, Dense, Dropout
from tensorflow.keras.models import Sequential

from bayesflow.exceptions import ConfigurationError
from bayesflow.wrappers import SpectralNormalization


[docs] class DenseCouplingNet(tf.keras.Model): """Implements a conditional version of a standard fully connected (FC) network. Would also work as an unconditional estimator."""
[docs] def __init__(self, settings, dim_out, **kwargs): """Creates a conditional coupling net (FC neural network). Parameters ---------- settings : dict A dictionary holding arguments for a dense layer: See https://www.tensorflow.org/api_docs/python/tf/keras/layers/Dense As well as custom arguments for settings such as residual networks, dropout, and spectral normalization. dim_out : int Number of outputs of the coupling net. Determined internally by the consumer classes. **kwargs : dict, optional, default: {} Optional keyword arguments passed to the `tf.keras.Model` constructor. """ super().__init__(**kwargs) # Create network body (input and hidden layers) self.fc = Sequential() for _ in range(settings["num_dense"]): # Create dense layer with dict kwargs layer = Dense(**settings["dense_args"]) # Wrap in spectral normalization, if specified if settings.get("spec_norm") is True: layer = SpectralNormalization(layer) self.fc.add(layer) # Figure out which dropout to use, MC has precedence over standard # Fails gently, if no dropout_prob is specified # Case both specified, MC wins if settings.get("dropout") and settings.get("mc_dropout"): self.fc.add(MCDropout(dropout_prob=settings["dropout_prob"])) # Case only dropout, use standard elif settings.get("dropout") and not settings.get("mc_dropout"): self.fc.add(Dropout(rate=settings["dropout_prob"])) # Case only MC, use MC elif not settings.get("dropout") and settings.get("mc_dropout"): self.fc.add(MCDropout(dropout_prob=settings["dropout_prob"])) # No dropout else: pass # Set residual flag if settings.get("residual"): self.fc.add(Dense(dim_out, **{k: v for k, v in settings["dense_args"].items() if k != "units"})) self.residual_output = Dense(dim_out, kernel_initializer="zeros") else: self.fc.add(Dense(dim_out, kernel_initializer="zeros")) self.residual_output = None self.fc.build(input_shape=())
[docs] def call(self, target, condition, **kwargs): """Concatenates target and condition and performs a forward pass through the coupling net. Parameters ---------- target : tf.Tensor The split estimation quntities, for instance, parameters :math:`\\theta \sim p(\\theta)` of interest, shape (batch_size, ...) condition : tf.Tensor or None the conditioning vector of interest, for instance ``x = summary(x)``, shape (batch_size, summary_dim) """ # Handle case no condition if condition is None: if self.residual_output is not None: return self.residual_output(self.fc(target, **kwargs) + target, **kwargs) else: return self.fc(target, **kwargs) # Handle 3D case for a set-flow and repeat condition over # the second `time` or `n_observations` axis of `target`` if len(tf.shape(target)) == 3 and len(tf.shape(condition)) == 2: shape = tf.shape(target) condition = tf.expand_dims(condition, 1) condition = tf.tile(condition, [1, shape[1], 1]) inp = tf.concat((target, condition), axis=-1) out = self.fc(inp, **kwargs) if self.residual_output is not None: out = self.residual_output(out + target, **kwargs) return out
[docs] class Permutation(tf.keras.Model): """Implements a layer to permute the inputs entering a (conditional) coupling layer. Uses fixed permutations, as these perform equally well compared to learned permutations."""
[docs] def __init__(self, input_dim): """Creates an invertible permutation layer for a conditional invertible layer. Parameters ---------- input_dim : int Ihe dimensionality of the input to the (conditional) coupling layer. """ super().__init__() permutation_vec = np.random.permutation(input_dim) inv_permutation_vec = np.argsort(permutation_vec) self.permutation = tf.Variable( initial_value=permutation_vec, trainable=False, dtype=tf.int32, name="permutation" ) self.inv_permutation = tf.Variable( initial_value=inv_permutation_vec, trainable=False, dtype=tf.int32, name="inv_permutation" )
[docs] def call(self, target, inverse=False): """Permutes a batch of target vectors over the last axis. Parameters ---------- target : tf.Tensor of shape (batch_size, ...) The target vector to be permuted over its last axis. inverse : bool, optional, default: False Controls if the current pass is forward (``inverse=False``) or inverse (``inverse=True``). Returns ------- out : tf.Tensor of the same shape as `target`. The (un-)permuted target vector. """ if not inverse: return self._forward(target) else: return self._inverse(target)
def _forward(self, target): """Performs a fixed permutation over the last axis.""" return tf.gather(target, self.permutation, axis=-1) def _inverse(self, target): """Un-does the fixed permutation over the last axis.""" return tf.gather(target, self.inv_permutation, axis=-1)
[docs] class Orthogonal(tf.keras.Model): """Implements a learnable orthogonal transformation according to [1]. Can be used as an alternative to a fixed ``Permutation`` layer. [1] Kingma, D. P., & Dhariwal, P. (2018). Glow: Generative flow with invertible 1x1 convolutions. Advances in neural information processing systems, 31. """
[docs] def __init__(self, input_dim): """Creates an invertible orthogonal transformation (generalized permutation) Parameters ---------- input_dim : int Ihe dimensionality of the input to the (conditional) coupling layer. """ super().__init__() init = tf.keras.initializers.Orthogonal() self.W = tf.Variable( initial_value=init(shape=(input_dim, input_dim)), trainable=True, dtype=tf.float32, name="learnable_permute" )
[docs] def call(self, target, inverse=False): """Transforms a batch of target vectors over the last axis through an approximately orthogonal transform. Parameters ---------- target : tf.Tensor of shape (batch_size, ...) The target vector to be rotated over its last axis. inverse : bool, optional, default: False Controls if the current pass is forward (``inverse=False``) or inverse (``inverse=True``). Returns ------- out : tf.Tensor of the same shape as `target`. The (un-)rotated target vector. """ if not inverse: return self._forward(target) else: return self._inverse(target)
def _forward(self, target): """Performs a learnable generalized permutation over the last axis.""" shape = tf.shape(target) rank = len(shape) log_det = tf.math.log(tf.math.abs(tf.linalg.det(self.W))) if rank == 2: z = tf.linalg.matmul(target, self.W) else: z = tf.tensordot(target, self.W, [[rank - 1], [0]]) log_det = tf.cast(shape[1], tf.float32) * log_det return z, log_det def _inverse(self, z): """Un-does the learnable permutation over the last axis.""" W_inv = tf.linalg.inv(self.W) rank = len(tf.shape(z)) if rank == 2: return tf.linalg.matmul(z, W_inv) return tf.tensordot(z, W_inv, [[rank - 1], [0]])
[docs] class MCDropout(tf.keras.Model): """Implements Monte Carlo Dropout as a Bayesian approximation according to [1]. Perhaps not the best approximation, but arguably the cheapest one out there! [1] Gal, Y., & Ghahramani, Z. (2016, June). Dropout as a bayesian approximation: Representing model uncertainty in deep learning. In international conference on machine learning (pp. 1050-1059). PMLR. """
[docs] def __init__(self, dropout_prob=0.1, **kwargs): """Creates a custom instance of an MC Dropout layer. Will be used both during training and inference. Parameters ---------- dropout_prob : float, optional, default: 0.1 The dropout rate to be passed to ``tf.keras.layers.Dropout()``. """ super().__init__(**kwargs) self.drop = Dropout(dropout_prob)
[docs] def call(self, inputs): """Randomly sets elements of ``inputs`` to zero. Parameters ---------- inputs : tf.Tensor Input of shape (batch_size, ...) Returns ------- out : tf.Tensor Output of shape (batch_size, ...), same as ``inputs``. """ out = self.drop(inputs, training=True) return out
[docs] class ActNorm(tf.keras.Model): """Implements an Activation Normalization (ActNorm) Layer. Activation Normalization is learned invertible normalization, using a Scale (s) and Bias (b) vector:: y = s * x + b (forward) x = (y - b)/s (inverse) Notes ----- The scale and bias can be data dependent initialized, such that the output has a mean of zero and standard deviation of one [1]_[2]_. Alternatively, it is initialized with vectors of ones (scale) and zeros (bias). References ---------- .. [1] Kingma, Diederik P., and Prafulla Dhariwal. "Glow: Generative flow with invertible 1x1 convolutions." arXiv preprint arXiv:1807.03039 (2018). .. [2] Salimans, Tim, and Durk P. Kingma. "Weight normalization: A simple reparameterization to accelerate training of deep neural networks." Advances in neural information processing systems 29 (2016): 901-909. """
[docs] def __init__(self, latent_dim, act_norm_init, **kwargs): """Creates an instance of an ActNorm Layer as proposed by [1]. Parameters ---------- latent_dim : int The dimensionality of the latent space (equal to the dimensionality of the target variable) act_norm_init : np.ndarray of shape (num_simulations, num_params) or None, optional, default: None Optional data-dependent initialization for the internal ``ActNorm`` layers, as done in [1]. Could be helpful for deep invertible networks. """ super().__init__(**kwargs) # Initialize scale and bias with zeros and ones if no batch for initialization was provided. if act_norm_init is None: self.scale = tf.Variable(tf.ones((latent_dim,)), trainable=True, name="act_norm_scale") self.bias = tf.Variable(tf.zeros((latent_dim,)), trainable=True, name="act_norm_bias") else: self._initalize_parameters_data_dependent(act_norm_init)
[docs] def call(self, target, inverse=False): """Performs one pass through the actnorm layer (either inverse or forward) and normalizes the last axis of `target`. Parameters ---------- target : tf.Tensor of shape (batch_size, ...) the target variables of interest, i.e., parameters for posterior estimation inverse : bool, optional, default: False Flag indicating whether to run the block forward or backwards Returns ------- (z, log_det_J) : tuple(tf.Tensor, tf.Tensor) If inverse=False: The transformed input and the corresponding Jacobian of the transformation, v shape: (batch_size, inp_dim), log_det_J shape: (,) target : tf.Tensor If inverse=True: The inversely transformed targets, shape == target.shape Notes ----- If ``inverse=False``, the return is ``(z, log_det_J)``.\n If ``inverse=True``, the return is ``target``. """ if not inverse: return self._forward(target) else: return self._inverse(target)
def _forward(self, target): """Performs a forward pass through the layer.""" z = self.scale * target + self.bias ldj = tf.math.reduce_sum(tf.math.log(tf.math.abs(self.scale)), axis=-1) return z, ldj def _inverse(self, target): """Performs an inverse pass through the layer.""" return (target - self.bias) / self.scale def _initalize_parameters_data_dependent(self, init_data): """Performs a data dependent initalization of the scale and bias. Initalizes the scale and bias vector as proposed by [1], such that the layer output has a mean of zero and a standard deviation of one. [1] - Salimans, Tim, and Durk P. Kingma. Weight normalization: A simple reparameterization to accelerate training of deep neural networks. Advances in neural information processing systems 29 (2016): 901-909. Parameters ---------- init_data : tf.Tensor of shape (batch size, number of parameters) Initiall values to estimate the scale and bias parameters by computing the mean and standard deviation along the first dimension of `init_data`. """ # 2D Tensor case, assume first batch dimension if tf.rank(init_data) == 2: mean = tf.math.reduce_mean(init_data, axis=0) std = tf.math.reduce_std(init_data, axis=0) # 3D Tensor case, assume first batch dimension, second number of observations dimension elif tf.rank(init_data) == 3: mean = tf.math.reduce_mean(init_data, axis=(0, 1)) std = tf.math.reduce_std(init_data, axis=(0, 1)) # Raise other cases else: raise ConfigurationError( f"""Currently, ActNorm supports only 2D and 3D Tensors, but act_norm_init contains data with shape {init_data.shape}.""" ) scale = 1.0 / std bias = (-1.0 * mean) / std self.scale = tf.Variable(scale, trainable=True, name="act_norm_scale") self.bias = tf.Variable(bias, trainable=True, name="act_norm_bias")
[docs] class InvariantModule(tf.keras.Model): """Implements an invariant module performing a permutation-invariant transform. For details and rationale, see: [1] Bloem-Reddy, B., & Teh, Y. W. (2020). Probabilistic Symmetries and Invariant Neural Networks. J. Mach. Learn. Res., 21, 90-1. https://www.jmlr.org/papers/volume21/19-322/19-322.pdf """
[docs] def __init__(self, settings, **kwargs): """Creates an invariant module according to [1] which represents a learnable permutation-invariant function with an option for learnable pooling. Parameters ---------- settings : dict A dictionary holding the configuration settings for the module. **kwargs : dict, optional, default: {} Optional keyword arguments passed to the `tf.keras.Model` constructor. """ super().__init__(**kwargs) # Create internal functions self.s1 = Sequential([Dense(**settings["dense_s1_args"]) for _ in range(settings["num_dense_s1"])]) self.s2 = Sequential([Dense(**settings["dense_s2_args"]) for _ in range(settings["num_dense_s2"])]) # Pick pooling function if settings["pooling_fun"] == "mean": pooling_fun = partial(tf.reduce_mean, axis=-2) elif settings["pooling_fun"] == "max": pooling_fun = partial(tf.reduce_max, axis=-2) else: if callable(settings["pooling_fun"]): pooling_fun = settings["pooling_fun"] else: raise ConfigurationError("pooling_fun argument not understood!") self.pooler = pooling_fun
[docs] def call(self, x, **kwargs): """Performs the forward pass of a learnable invariant transform. Parameters ---------- x : tf.Tensor Input of shape (batch_size,..., x_dim) Returns ------- out : tf.Tensor Output of shape (batch_size,..., out_dim) """ x_reduced = self.pooler(self.s1(x, **kwargs)) out = self.s2(x_reduced, **kwargs) return out
[docs] class EquivariantModule(tf.keras.Model): """Implements an equivariant module performing an equivariant transform. For details and justification, see: [1] Bloem-Reddy, B., & Teh, Y. W. (2020). Probabilistic Symmetries and Invariant Neural Networks. J. Mach. Learn. Res., 21, 90-1. https://www.jmlr.org/papers/volume21/19-322/19-322.pdf """
[docs] def __init__(self, settings, **kwargs): """Creates an equivariant module according to [1] which combines equivariant transforms with nested invariant transforms, thereby enabling interactions between set members. Parameters ---------- settings : dict A dictionary holding the configuration settings for the module. **kwargs : dict, optional, default: {} Optional keyword arguments passed to the ``tf.keras.Model`` constructor. """ super().__init__(**kwargs) self.invariant_module = InvariantModule(settings) self.s3 = Sequential([Dense(**settings["dense_s3_args"]) for _ in range(settings["num_dense_s3"])])
[docs] def call(self, x, **kwargs): """Performs the forward pass of a learnable equivariant transform. Parameters ---------- x : tf.Tensor Input of shape (batch_size, ..., x_dim) Returns ------- out : tf.Tensor Output of shape (batch_size, ..., equiv_dim) """ # Store shape of x, will be (batch_size, ..., some_dim) shape = tf.shape(x) # Example: Output dim is (batch_size, inv_dim) - > (batch_size, N, inv_dim) out_inv = self.invariant_module(x, **kwargs) out_inv = tf.expand_dims(out_inv, -2) tiler = [1] * len(shape) tiler[-2] = shape[-2] out_inv_rep = tf.tile(out_inv, tiler) # Concatenate each x with the repeated invariant embedding out_c = tf.concat([x, out_inv_rep], axis=-1) # Pass through equivariant func out = self.s3(out_c, **kwargs) return out
[docs] class MultiConv1D(tf.keras.Model): """Implements an inception-inspired 1D convolutional layer using different kernel sizes."""
[docs] def __init__(self, settings, **kwargs): """Creates an inception-like Conv1D layer Parameters ---------- settings : dict A dictionary which holds the arguments for the internal ``Conv1D`` layers. """ super().__init__(**kwargs) # Create a list of Conv1D layers with different kernel sizes # ranging from 'min_kernel_size' to 'max_kernel_size' self.convs = [ Conv1D(kernel_size=f, **settings["layer_args"]) for f in range(settings["min_kernel_size"], settings["max_kernel_size"]) ] # Create final Conv1D layer for dimensionalitiy reduction dim_red_args = {k: v for k, v in settings["layer_args"].items() if k not in ["kernel_size", "strides"]} dim_red_args["kernel_size"] = 1 dim_red_args["strides"] = 1 self.dim_red = Conv1D(**dim_red_args)
[docs] def call(self, x, **kwargs): """Performs a forward pass through the layer. Parameters ---------- x : tf.Tensor Input of shape (batch_size, n_time_steps, n_time_series) Returns ------- out : tf.Tensor Output of shape (batch_size, n_time_steps, n_filters) """ out = self._multi_conv(x, **kwargs) out = self.dim_red(out, **kwargs) return out
def _multi_conv(self, x, **kwargs): """Applies the convolutions with different sizes and concatenates outputs.""" return tf.concat([conv(x, **kwargs) for conv in self.convs], axis=-1)
[docs] class ConfigurableMLP(tf.keras.Model): """Implements a simple configurable MLP with optional residual connections and dropout."""
[docs] def __init__( self, input_dim, hidden_dim=512, output_dim=None, num_hidden=2, activation="relu", residual=True, dropout_rate=0.05, **kwargs, ): """ Creates an instance of a flexible and simple MLP with optional residual connections and dropout. Parameters: ----------- input_dim : int The input dimensionality hidden_dim : int, optional, default: 512 The dimensionality of the hidden layers output_dim : int, optional, default: None The output dimensionality. If None is passed, `output_dim` is set to `input_dim` num_hidden : int, optional, default: 2 The number of hidden layers (minimum: 1) activation : string, optional, default: 'relu' The activation function of the dense layers residual : bool, optional, default: True Use residual connections in the MLP dropout_rate : float, optional, default: 0.05 Dropout rate for the hidden layers in the MLP """ super().__init__(**kwargs) self.input_dim = input_dim self.output_dim = input_dim if output_dim is None else output_dim self.model = tf.keras.Sequential( [tf.keras.layers.Dense(hidden_dim, activation=activation), tf.keras.layers.Dropout(dropout_rate)] ) for _ in range(num_hidden): self.model.add( ConfigurableHiddenBlock( hidden_dim, activation=activation, residual=residual, dropout_rate=dropout_rate, ) ) self.model.add(tf.keras.layers.Dense(self.output_dim))
[docs] def call(self, inputs, **kwargs): return self.model(inputs, **kwargs)
[docs] class ConfigurableHiddenBlock(tf.keras.Model):
[docs] def __init__(self, num_units, activation="relu", residual=True, dropout_rate=0.0): super().__init__() self.act_fn = tf.keras.activations.get(activation) self.residual = residual self.dense_with_dropout = tf.keras.Sequential( [tf.keras.layers.Dense(num_units, activation=None), tf.keras.layers.Dropout(dropout_rate)] )
[docs] def call(self, inputs, **kwargs): x = self.dense_with_dropout(inputs, **kwargs) if self.residual: x = x + inputs return self.act_fn(x)