Source code for bayesflow.workflows.basic_workflow

from collections.abc import Mapping, Sequence, Callable

import os

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import keras

from bayesflow.datasets import OnlineDataset, OfflineDataset, DiskDataset
from bayesflow.networks import InferenceNetwork, PointInferenceNetwork, SummaryNetwork
from bayesflow.simulators import Simulator
from bayesflow.adapters import Adapter
from bayesflow.approximators import ContinuousApproximator, PointApproximator
from bayesflow.types import Shape
from bayesflow.utils import find_inference_network, find_summary_network, logging
from bayesflow.diagnostics import metrics as bf_metrics
from bayesflow.diagnostics import plots as bf_plots

from .workflow import Workflow


[docs] class BasicWorkflow(Workflow): def __init__( self, simulator: Simulator = None, adapter: Adapter = None, inference_network: InferenceNetwork | str = "coupling_flow", summary_network: SummaryNetwork | str = None, initial_learning_rate: float = 5e-4, optimizer: keras.optimizers.Optimizer | type = None, checkpoint_filepath: str = None, checkpoint_name: str = "model", save_weights_only: bool = False, save_best_only: bool = False, inference_variables: Sequence[str] | str = None, inference_conditions: Sequence[str] | str = None, summary_variables: Sequence[str] | str = None, standardize: Sequence[str] | str = "inference_variables", **kwargs, ): """ This class provides methods to set up, simulate, and fit and validate models using amortized Bayesian inference. It allows for both online and offline amortized workflows. Parameters ---------- simulator : Simulator, optional A Simulator object to generate synthetic data for inference (default is None). adapter : Adapter, optional Adapter for data processing. If not provided, a default adapter will be used (default is None), but you need to make sure to provide the correct names for `inference_variables` and/or `inference_conditions` and/or `summary_variables`. inference_network : InferenceNetwork or str, optional The inference network used for posterior approximation, specified as an instance or by name (default is "coupling_flow"). summary_network : SummaryNetwork or str, optional The summary network used for data summarization, specified as an instance or by name (default is None). initial_learning_rate : float, optional Initial learning rate for the optimizer (default is 5e-4). optimizer : type, optional The optimizer to be used for training. If None, a default Adam optimizer will be selected (default is None). checkpoint_filepath : str, optional Directory path where model checkpoints will be saved (default is None). checkpoint_name : str, optional Name of the checkpoint file (default is "model"). save_weights_only : bool, optional If True, only the model weights will be saved during checkpointing (default is False). save_best_only: bool, optional If only the best model according to the quantity monitored (loss or validation) at the end of each epoch will be saved instead of the last model (default is False). Use with caution, as some losses (e.g. flow matching) do not reliably reflect model performance, and outliers in the validation data can cause unwanted effects. inference_variables : Sequence[str] or str, optional Variables for inference as a sequence of strings or a single string (default is None). Important for automating diagnostics! inference_conditions : Sequence[str] or str, optional Variables used as conditions for inference (default is None). summary_variables : Sequence[str] or str, optional Variables for summarizing data, if any (default is None). standardize : Sequence[str] or str, optional Variables to standardize during preprocessing (default is "inference_variables"). **kwargs : dict, optional Additional arguments for configuring networks, adapters, optimizers, etc. """ self.inference_network = find_inference_network(inference_network, **kwargs.get("inference_kwargs", {})) if summary_network is not None: self.summary_network = find_summary_network(summary_network, **kwargs.get("summary_kwargs", {})) else: self.summary_network = None self.simulator = simulator adapter = adapter or BasicWorkflow.default_adapter( inference_variables, inference_conditions, summary_variables, standardize ) if isinstance(self.inference_network, PointInferenceNetwork): constructor = PointApproximator else: constructor = ContinuousApproximator self.approximator = constructor( inference_network=self.inference_network, summary_network=self.summary_network, adapter=adapter ) self.initial_learning_rate = initial_learning_rate if isinstance(optimizer, type): self.optimizer = optimizer(initial_learning_rate, **kwargs.get("optimizer_kwargs", {})) else: self.optimizer = optimizer self.checkpoint_filepath = checkpoint_filepath self.checkpoint_name = checkpoint_name self.save_weights_only = save_weights_only self.save_best_only = save_best_only if self.checkpoint_filepath is not None: if self.save_weights_only: file_ext = self.checkpoint_name + ".weights.h5" else: file_ext = self.checkpoint_name + ".keras" checkpoint_full_filepath = os.path.join(self.checkpoint_filepath, file_ext) if os.path.exists(checkpoint_full_filepath): msg = ( f"Checkpoint file exists: '{checkpoint_full_filepath}'.\n" "Existing checkpoints can _not_ be restored/loaded using this workflow. " "Upon refitting, the checkpoints will be overwritten." ) if not self.save_weights_only: msg += ( " To load the stored approximator from the checkpoint, " "use approximator = keras.saving.load_model(...)" ) logging.warning(msg) self.history = None @property def adapter(self): return self.approximator.adapter
[docs] @staticmethod def samples_to_data_frame(samples: Mapping[str, np.ndarray]) -> pd.DataFrame: """ Convert a dictionary of samples into a pandas DataFrame. Parameters ---------- samples : Mapping[str, np.ndarray] A dictionary where keys represent variable names and values are arrays containing sampled data. Returns ------- pd.DataFrame A pandas DataFrame where each column corresponds to a variable, and rows represent individual samples. """ return pd.DataFrame(keras.tree.map_structure(np.squeeze, samples))
[docs] @staticmethod def default_adapter( inference_variables: Sequence[str] | str, inference_conditions: Sequence[str] | str, summary_variables: Sequence[str] | str, standardize: Sequence[str] | str, ) -> Adapter: """ Create a default adapter for processing inference variables, conditions, summaries, and standardization. - Converts all float64 values to float32 for computational efficiency. Parameters ---------- inference_variables : Sequence[str] or str The variables to be treated as inference targets. inference_conditions : Sequence[str] or str The variables used as conditions for inference. summary_variables : Sequence[str] or str The variables used for summarization. standardize : Sequence[str] or str The variables to be standardized. Returns ------- Adapter A configured Adapter instance that applies dtype conversion, concatenation, and optional standardization. """ adapter = ( Adapter() .convert_dtype(from_dtype="float64", to_dtype="float32") .concatenate(inference_variables, into="inference_variables") ) if inference_conditions is not None: adapter = adapter.concatenate(inference_conditions, into="inference_conditions") if summary_variables is not None: adapter = adapter.concatenate(summary_variables, into="summary_variables") if standardize is not None: adapter = adapter.standardize(include=standardize) return adapter
[docs] def simulate(self, batch_shape: Shape, **kwargs) -> dict[str, np.ndarray]: """ Generates a batch of simulations using the provided simulator. Parameters ---------- batch_shape : Shape The shape of the batch to be simulated. Typically an integer for simple simulators. **kwargs : dict, optional Additional keyword arguments passed to the simulator's sample method. Returns ------- dict[str, np.ndarray] A dictionary where keys represent variable names and values are NumPy arrays containing the simulated variables. Raises ------ RuntimeError If no simulator is provided. """ if self.simulator is not None: return self.simulator.sample(batch_shape, **kwargs) raise RuntimeError("No simulator provided!")
[docs] def simulate_adapted(self, batch_shape: Shape, **kwargs) -> dict[str, np.ndarray]: """ Generates a batch of simulations and applies the adapter to the result. Parameters ---------- batch_shape : Shape The shape of the batch to be simulated. Typically an integer for simple simulators. **kwargs : dict, optional Additional keyword arguments passed to the simulator's sample method. Returns ------- dict[str, np.ndarray] A dictionary where keys represent variable names and values are NumPy arrays containing the adapted simulated variables. Raises ------ RuntimeError If no simulator is provided. """ if self.simulator is not None: return self.adapter(self.simulator.sample(batch_shape, **kwargs)) raise RuntimeError("No simulator provided!")
[docs] def sample( self, *, num_samples: int, conditions: Mapping[str, np.ndarray], **kwargs, ) -> dict[str, np.ndarray]: """ Draws `num_samples` samples from the approximator given specified conditions. Parameters ---------- num_samples : int The number of samples to generate. conditions : dict[str, np.ndarray] A dictionary where keys represent variable names and values are NumPy arrays containing the adapted simulated variables. Keys used as summary or inference conditions during training should be present. **kwargs : dict, optional Additional keyword arguments passed to the approximator's sampling function. Returns ------- dict[str, np.ndarray] A dictionary where keys correspond to variable names and values are arrays containing the generated samples. """ return self.approximator.sample(num_samples=num_samples, conditions=conditions, **kwargs)
[docs] def estimate( self, *, conditions: Mapping[str, np.ndarray], **kwargs, ) -> dict[str, dict[str, np.ndarray | dict[str, np.ndarray]]]: """ Estimates point summaries of inference variables based on specified conditions. Parameters ---------- conditions : Mapping[str, np.ndarray] A dictionary mapping variable names to arrays representing the conditions for the estimation process. **kwargs Additional keyword arguments passed to underlying processing functions. Returns ------- estimates : dict[str, dict[str, np.ndarray or dict[str, np.ndarray]]] The estimates of inference variables in a nested dictionary. 1. Each first-level key is the name of an inference variable. 2. Each second-level key is the name of a scoring rule. 3. (If the scoring rule comprises multiple estimators, each third-level key is the name of an estimator.) Each estimator output (i.e., dictionary value that is not itself a dictionary) is an array of shape (num_datasets, point_estimate_size, variable_block_size). """ return self.approximator.estimate(conditions=conditions, **kwargs)
[docs] def log_prob(self, data: Mapping[str, np.ndarray], **kwargs) -> np.ndarray: """ Compute the log probability of given variables under the approximator. Parameters ---------- data : Mapping[str, np.ndarray] A dictionary where keys represent variable names and values are arrays corresponding to the variables' realizations. **kwargs : dict, optional Additional keyword arguments passed to the approximator's log probability function. Returns ------- np.ndarray An array containing the log probabilities computed from the provided variables. """ return self.approximator.log_prob(data=data, **kwargs)
[docs] def plot_default_diagnostics( self, test_data: Mapping[str, np.ndarray] | int, num_samples: int = 1000, variable_keys: Sequence[str] = None, variable_names: Sequence[str] = None, **kwargs, ) -> dict[str, plt.Figure]: """ Generates default diagnostic plots to evaluate the quality of inference. The function produces several diagnostic plots, including: - Loss history (if training history is available). - Parameter recovery plots. - Calibration ECDF plots. - Z-score contraction plots. Parameters ---------- test_data : Mapping[str, np.ndarray] or int A dictionary containing test data where keys represent variable names and values are corresponding data arrays. If an integer is provided, that number of test data sets will be generated using the simulator (if available). num_samples : int, optional The number of samples to draw from the approximator for diagnostics, by default 1000. variable_keys : list or None, optional, default: None Select keys from the dictionaries provided in estimates and targets. By default, select all keys. variable_names : list or None, optional, default: None The variable names for nice table plot titles. **kwargs : dict, optional Additional keyword arguments: - `test_data_kwargs`: dict, optional Arguments to pass to the simulator when generating test data. - `approximator_kwargs`: dict, optional Arguments to pass to the approximator's sampling function. - `loss_kwargs`: dict, optional Arguments for customizing the loss plot. - `recovery_kwargs`: dict, optional Arguments for customizing the parameter recovery plot. - `calibration_ecdf_kwargs`: dict, optional Arguments for customizing the empirical cumulative distribution function (ECDF) calibration plot. - `z_score_contraction_kwargs`: dict, optional Arguments for customizing the z-score contraction plot. Returns ------- dict[str, plt.Figure] A dictionary where keys correspond to different diagnostic plot types, and values are the respective matplotlib Figure objects. """ samples, test_data = self._prepare_for_diagnostics(test_data, num_samples, **kwargs) figures = dict() if self.history is not None: figures["losses"] = bf_plots.loss(self.history, **kwargs.get("loss_kwargs", {})) plot_fns = { "recovery": bf_plots.recovery, "calibration_ecdf": bf_plots.calibration_ecdf, "z_score_contraction": bf_plots.z_score_contraction, } for k, plot_fn in plot_fns.items(): figures[k] = plot_fn( estimates=samples, targets=test_data, variable_keys=variable_keys, variable_names=variable_names, **kwargs.get(f"{k}_kwargs", {}), ) return figures
[docs] def plot_custom_diagnostics( self, test_data: Mapping[str, np.ndarray] | int, plot_fns: Mapping[str, Callable], num_samples: int = 1000, variable_keys: Sequence[str] = None, variable_names: Sequence[str] = None, **kwargs, ) -> dict[str, plt.Figure]: """ Generates custom diagnostic plots to evaluate the quality of inference. The functions passed should have the following signature: - fn(samples, inference_variables, variable_names) They should also return a single matplotlib Figure object. Parameters ---------- test_data : Mapping[str, np.ndarray] or int A dictionary containing test data where keys represent variable names and values are corresponding data arrays. If an integer is provided, that number of test data sets will be generated using the simulator (if available). plot_fns: Mapping[str, Callable] A dictionary containing custom plotting functions where keys represent the function names and values correspond to the functions themselves. The functions should have a signature of fn(samples, inference_variables, variable_names) num_samples : int, optional The number of samples to draw from the approximator for diagnostics, by default 1000. variable_keys : list or None, optional, default: None Select keys from the dictionaries provided in estimates and targets. By default, select all keys. variable_names : list or None, optional, default: None The variable names for nice table plot titles. **kwargs : dict, optional Additional keyword arguments: - `test_data_kwargs`: dict, optional Arguments to pass to the simulator when generating test data. - `approximator_kwargs`: dict, optional Arguments to pass to the approximator's sampling function. - `loss_kwargs`: dict, optional Arguments for customizing the loss plot. - `recovery_kwargs`: dict, optional Arguments for customizing the parameter recovery plot. - `calibration_ecdf_kwargs`: dict, optional Arguments for customizing the empirical cumulative distribution function (ECDF) calibration plot. - `z_score_contraction_kwargs`: dict, optional Arguments for customizing the z-score contraction plot. Returns ------- dict[str, plt.Figure] A dictionary where keys correspond to different diagnostic plot types, and values are the respective matplotlib Figure objects. """ samples, test_data = self._prepare_for_diagnostics(test_data, num_samples, **kwargs) figures = dict() for key, plot_fn in plot_fns.items(): figures[key] = plot_fn(samples, test_data, variable_keys=variable_keys, variable_names=variable_names) return figures
[docs] def plot_diagnostics(self, **kwargs): logging.warning( "This function will be deprecated in future versions. Please, use plot_default_diagnostics" "or plot_custom_diagnositcs if you want to use your custom diagnostics." ) return self.plot_default_diagnostics(**kwargs)
[docs] def compute_default_diagnostics( self, test_data: Mapping[str, np.ndarray] | int, num_samples: int = 1000, variable_keys: Sequence[str] = None, variable_names: Sequence[str] = None, as_data_frame: bool = True, **kwargs, ) -> Sequence[dict] | pd.DataFrame: """ Computes default diagnostic metrics to evaluate the quality of inference. The function computes several diagnostic metrics, including: - Root Mean Squared Error (RMSE) - Posterior contraction - Calibration error Parameters ---------- test_data : Mapping[str, np.ndarray] or int A dictionary containing test data where keys represent variable names and values are corresponding realizations. If an integer is provided, that number of test data sets will be generated using the simulator (if available). num_samples : int, optional The number of samples to draw from the approximator for diagnostics, by default 1000. variable_keys : list or None, optional, default: None Select keys from the dictionaries provided in estimates and targets. By default, select all keys. variable_names : list or None, optional, default: None The parameter names for nice table plot titles. as_data_frame : bool, optional Whether to return the results as a pandas DataFrame (default: True). If False, a sequence of dictionaries with metric values is returned. **kwargs : dict, optional Additional keyword arguments: - `test_data_kwargs`: dict, optional Arguments to pass to the simulator when generating test data. - `approximator_kwargs`: dict, optional Arguments to pass to the approximator's sampling function. - `root_mean_squared_error_kwargs`: dict, optional Arguments for customizing the RMSE computation. - `posterior_contraction_kwargs`: dict, optional Arguments for customizing the posterior contraction computation. - `calibration_error_kwargs`: dict, optional Arguments for customizing the calibration error computation. Returns ------- Sequence[dict] or pd.DataFrame If `as_data_frame` is True, returns a pandas DataFrame containing the computed diagnostic metrics for each variable. Otherwise, returns a sequence of dictionaries with metric values. """ samples, test_data = self._prepare_for_diagnostics(test_data, num_samples, **kwargs) root_mean_squared_error = bf_metrics.root_mean_squared_error( estimates=samples, targets=test_data, variable_keys=variable_keys, variable_names=variable_names, **kwargs.get("root_mean_squared_error_kwargs", {}), ) contraction = bf_metrics.posterior_contraction( estimates=samples, targets=test_data, variable_keys=variable_keys, variable_names=variable_names, **kwargs.get("posterior_contraction_kwargs", {}), ) calibration_errors = bf_metrics.calibration_error( estimates=samples, targets=test_data, variable_keys=variable_keys, variable_names=variable_names, **kwargs.get("calibration_error_kwargs", {}), ) if as_data_frame: metrics = pd.DataFrame( { root_mean_squared_error["metric_name"]: root_mean_squared_error["values"], contraction["metric_name"]: contraction["values"], calibration_errors["metric_name"]: calibration_errors["values"], }, index=variable_keys or root_mean_squared_error["variable_names"], ).T else: metrics = (root_mean_squared_error, contraction, calibration_errors) return metrics
[docs] def compute_custom_diagnostics( self, test_data: Mapping[str, np.ndarray] | int, metrics: Mapping[str, Callable], num_samples: int = 1000, variable_keys: Sequence[str] = None, variable_names: Sequence[str] = None, as_data_frame: bool = True, **kwargs, ) -> Sequence[Mapping] | pd.DataFrame: """ Computes custom diagnostic metrics to evaluate the quality of inference. The metric functions should have a signature of: - metric_fn(samples, inference_variables, variable_names, variable_keys) or - metric_fn(samples, inference_variables, **kwargs) And return a dictionary containing the metric name in 'name' key and the metric values in a 'values' key. Parameters ---------- test_data : Mapping[str, np.ndarray] or int A dictionary containing test data where keys represent variable names and values are corresponding realizations. If an integer is provided, that number of test data sets will be generated using the simulator (if available). metrics: Mapping[str, Callable] A dictionary containing custom metric functions where keys represent the function names and values correspond to the functions themselves. The functions should have a signature of fn(samples, inference_variables, variable_names) num_samples : int, optional The number of samples to draw from the approximator for diagnostics, by default 1000. variable_keys : list or None, optional, default: None Select keys from the dictionaries provided in estimates and targets. By default, select all keys. variable_names : list or None, optional, default: None The variable names for nice plot titles. as_data_frame : bool, optional Whether to return the results as a pandas DataFrame (default: True). If False, a sequence of dictionaries with metric values is returned. **kwargs : dict, optional Additional keyword arguments: - `test_data_kwargs`: dict, optional Arguments to pass to the simulator when generating test data. - `approximator_kwargs`: dict, optional Arguments to pass to the approximator's sampling function. - `root_mean_squared_error_kwargs`: dict, optional Arguments for customizing the RMSE computation. - `posterior_contraction_kwargs`: dict, optional Arguments for customizing the posterior contraction computation. - `calibration_error_kwargs`: dict, optional Arguments for customizing the calibration error computation. Returns ------- Sequence[dict] or pd.DataFrame If `as_data_frame` is True, returns a pandas DataFrame containing the computed diagnostic metrics for each variable. Otherwise, returns a sequence of dictionaries with metric values. """ samples, test_data = self._prepare_for_diagnostics(test_data, num_samples, **kwargs) metrics_dict = {} for key, metric_fn in metrics.items(): metric = metric_fn(samples, test_data, variable_keys=variable_keys, variable_names=variable_names) metrics_dict[metric["name"]] = metric["values"] if as_data_frame: return pd.DataFrame(metrics_dict, index=variable_names) return metrics_dict
[docs] def compute_diagnostics(self, **kwargs): logging.warning( "This function will be deprecated in future versions. Please, use plot_default_diagnostics" "or compute_custom_diagnositcs if you want to use your own metrics." ) return self.compute_default_diagnostics(**kwargs)
[docs] def fit_offline( self, data: Mapping[str, np.ndarray], epochs: int = 100, batch_size: int = 32, keep_optimizer: bool = False, validation_data: Mapping[str, np.ndarray] | int = None, **kwargs, ) -> keras.callbacks.History: """ Train the approximator offline using a fixed dataset. This approach will be faster than online training, since no computation time is spent in generating new data for each batch, but it assumes that simulations can fit in memory. Parameters ---------- data : Mapping[str, np.ndarray] A dictionary containing training data where keys represent variable names and values are corresponding realizations. epochs : int, optional The number of training epochs, by default 100. Consider increasing this number for free-form inference networks, such as FlowMatching or ConsistencyModel, which generally need more epochs than CouplingFlows. batch_size : int, optional The batch size used for training, by default 32. keep_optimizer : bool, optional Whether to retain the current state of the optimizer after training, by default False. validation_data : Mapping[str, np.ndarray] or int, optional A dictionary containing validation data. If an integer is provided, that number of validation samples will be generated (if supported). By default, no validation data is used. **kwargs : dict, optional Additional keyword arguments passed to the underlying `_fit` method. Returns ------- history : keras.callbacks.History A history object containing training history, where keys correspond to logged metrics (e.g., loss values) and values are arrays tracking metric evolution over epochs. """ dataset = OfflineDataset(data=data, batch_size=batch_size, adapter=self.adapter) return self._fit( dataset, epochs, strategy="online", keep_optimizer=keep_optimizer, validation_data=validation_data, **kwargs )
[docs] def fit_online( self, epochs: int = 100, num_batches_per_epoch: int = 100, batch_size: int = 32, keep_optimizer: bool = False, validation_data: Mapping[str, np.ndarray] | int = None, **kwargs, ) -> keras.callbacks.History: """ Train the approximator using an online data-generating process. The dataset is dynamically generated during training, making this approach suitable for scenarios where generating new simulations is computationally cheap. Parameters ---------- epochs : int, optional The number of training epochs, by default 100. num_batches_per_epoch : int, optional The number of batches generated per epoch, by default 100. batch_size : int, optional The batch size used for training, by default 32. keep_optimizer : bool, optional Whether to retain the current state of the optimizer after training, by default False. validation_data : Mapping[str, np.ndarray] or int, optional A dictionary containing validation data. If an integer is provided, that number of validation samples will be generated (if supported). By default, no validation data is used. **kwargs : dict, optional Additional keyword arguments passed to the underlying `_fit` method. Returns ------- history : keras.callbacks.History A history object containing training history, where keys correspond to logged metrics (e.g., loss values) and values are arrays tracking metric evolution over epochs. """ dataset = OnlineDataset( simulator=self.simulator, batch_size=batch_size, num_batches=num_batches_per_epoch, adapter=self.adapter ) return self._fit( dataset, epochs, strategy="online", keep_optimizer=keep_optimizer, validation_data=validation_data, **kwargs )
[docs] def fit_disk( self, root: os.PathLike, pattern: str = "*.pkl", batch_size: int = 32, load_fn: callable = None, epochs: int = 100, keep_optimizer: bool = False, validation_data: Mapping[str, np.ndarray] | int = None, **kwargs, ) -> keras.callbacks.History: """ Train the approximator using data stored on disk. This approach is suitable for large sets of simulations that don't fit in memory. Parameters ---------- root : os.PathLike The root directory containing the dataset files. pattern : str, optional A filename pattern to match dataset files, by default ``"*.pkl"``. batch_size : int, optional The batch size used for training, by default 32. load_fn : callable, optional A function to load dataset files. If None, a default loading function is used. epochs : int, optional The number of training epochs, by default 100. Consider increasing this number for free-form inference networks, such as FlowMatching or ConsistencyModel, which generally need more epochs than CouplingFlows. keep_optimizer : bool, optional Whether to retain the current state of the optimizer after training, by default False. validation_data : Mapping[str, np.ndarray] or int, optional A dictionary containing validation data. If an integer is provided, that number of validation samples will be generated (if supported). By default, no validation data is used. **kwargs : dict, optional Additional keyword arguments passed to the underlying `_fit` method. Returns ------- history : keras.callbacks.History A history object containing training history, where keys correspond to logged metrics (e.g., loss values) and values are arrays tracking metric evolution over epochs. """ dataset = DiskDataset(root=root, pattern=pattern, batch_size=batch_size, load_fn=load_fn, adapter=self.adapter) return self._fit( dataset, epochs, strategy="online", keep_optimizer=keep_optimizer, validation_data=validation_data, **kwargs )
[docs] def build_optimizer(self, epochs: int, num_batches: int, strategy: str) -> keras.Optimizer | None: """ Build and initialize the optimizer based on the training strategy. Uses a cosine decay learning rate schedule, where the final learning rate is proportional to the square of the initial learning rate, as found to work best in SBI. Parameters ---------- epochs : int The total number of training epochs. num_batches : int The number of batches per epoch. strategy : str The training strategy, either "online" or another mode that applies weight decay. For "online" training, an Adam optimizer with gradient clipping is used. For other strategies, AdamW is used with weight decay to encourage regularization. Returns ------- keras.Optimizer or None The initialized optimizer if it was not already set. Returns None if the optimizer was already defined. """ if self.optimizer is not None: return # Default case learning_rate = keras.optimizers.schedules.CosineDecay( initial_learning_rate=0.5 * self.initial_learning_rate, warmup_target=self.initial_learning_rate, warmup_steps=num_batches, decay_steps=epochs * num_batches, alpha=0, ) # Use adam for online learning, apply weight decay otherwise if strategy.lower() == "online": self.optimizer = keras.optimizers.Adam(learning_rate, clipnorm=1.5) else: self.optimizer = keras.optimizers.AdamW(learning_rate, weight_decay=1e-3, clipnorm=1.5)
def _fit( self, dataset: keras.utils.PyDataset, epochs: int, strategy: str, keep_optimizer: bool, validation_data: Mapping[str, np.ndarray] | int, **kwargs, ) -> keras.callbacks.History: if validation_data is not None: if isinstance(validation_data, int) and self.simulator is not None: validation_data = self.simulator.sample(validation_data, **kwargs.pop("validation_data_kwargs", {})) elif isinstance(validation_data, int): raise ValueError(f"No simulator found for generating {validation_data} data sets.") validation_data = OfflineDataset(data=validation_data, batch_size=dataset.batch_size, adapter=self.adapter) monitor = "val_loss" else: monitor = "loss" if self.checkpoint_filepath is not None: if self.save_weights_only: file_ext = self.checkpoint_name + ".weights.h5" else: file_ext = self.checkpoint_name + ".keras" model_checkpoint_callback = keras.callbacks.ModelCheckpoint( filepath=os.path.join(self.checkpoint_filepath, file_ext), monitor=monitor, mode="min", save_best_only=self.save_best_only, save_weights_only=self.save_weights_only, save_freq="epoch", ) if kwargs.get("callbacks") is not None: kwargs["callbacks"].append(model_checkpoint_callback) else: kwargs["callbacks"] = [model_checkpoint_callback] self.build_optimizer(epochs, dataset.num_batches, strategy=strategy) if not self.approximator.built: self.approximator.compile(optimizer=self.optimizer, metrics=kwargs.pop("metrics", None)) try: self.history = self.approximator.fit( dataset=dataset, epochs=epochs, validation_data=validation_data, **kwargs ) self._on_training_finished() return self.history finally: if not keep_optimizer: self.optimizer = None def _prepare_for_diagnostics(self, test_data: Mapping[str, np.ndarray] | int, num_samples: int = 1000, **kwargs): if isinstance(test_data, int) and self.simulator is not None: test_data = self.simulator.sample(test_data, **kwargs.pop("test_data_kwargs", {})) elif isinstance(test_data, int): raise ValueError(f"No simulator found for generating {test_data} data sets.") samples = self.approximator.sample( num_samples=num_samples, conditions=test_data, **kwargs.get("approximator_kwargs", {}) ) return samples, test_data def _on_training_finished(self): if self.checkpoint_filepath is not None: if self.save_weights_only: file_ext = self.checkpoint_name + ".weights.h5" else: file_ext = self.checkpoint_name + ".keras" logging.info(f"""Training is now finished. You can find the trained approximator at '{self.checkpoint_filepath}/{self.checkpoint_name}.{file_ext}'. To load it, use approximator = keras.saving.load_model(...).""")