Source code for shapiq.imputer.marginal_imputer

"""Implementation of the marginal imputer."""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

import numpy as np

from .base import Imputer

if TYPE_CHECKING:
    from shapiq.typing import CoalitionMatrix, GameValues, Model

_too_large_sample_size_warning = (
    "The sample size is larger than the number of data points in the background set. "
    "Reducing the sample size to the number of background samples."
)


[docs] class MarginalImputer(Imputer): """The marginal imputer for the shapiq package. The marginal imputer replaces missing features of the explanation point ``x`` by values sampled from the background data. When ``joint_marginal_distribution=True``, rows are sampled **jointly** (i.e., from the empirical joint marginal); when ``False``, each feature column is independently shuffled to break dependencies (feature-wise marginals). This corresponds to *interventional* imputation (often called *marginal fANOVA* in the literature), as opposed to *observational* imputers that condition on observed features. Examples: >>> model = lambda x: np.sum(x, axis=1) # some dummy model >>> data = np.random.rand(1000, 4) # some background data >>> x_to_impute = np.array([[1, 1, 1, 1]]) # some data point to impute >>> imputer = MarginalImputer(model=model, data=data, x=x_to_impute, sample_size=100, random_state=42) >>> # get the model prediction with missing values >>> imputer(np.array([[True, False, True, False]])) np.array([2.01]) # some model prediction (might be different) >>> # exchange the background data >>> new_data = np.random.rand(1000, 4) >>> imputer.init_background(data=new_data) See Also: - :class:`shapiq.imputer.ConditionalImputer` for the conditional imputer. - :class:`shapiq.imputer.BaselineImputer` for the baseline imputer. - :class:`shapiq.imputer.base.Imputer` for the base imputer class. """ joint_marginal_distribution: bool """A flag indicating whether to sample from the joint marginal distribution (``True``) or independently for each feature (``False``).""" def __init__( self, model: Model, data: np.ndarray, *, x: np.ndarray | None = None, sample_size: int = 100, categorical_features: list[int] | None = None, joint_marginal_distribution: bool = True, normalize: bool = True, random_state: int | None = None, ) -> None: """Initializes the marginal imputer. Args: model: The model to explain as a callable function expecting a data points as input and returning the model's predictions. data: The background data to use for the explainer as a two-dimensional array with shape ``(n_samples, n_features)``. x: The explanation point to use the imputer on either as a 2-dimensional array with shape ``(1, n_features)`` or as a vector with shape ``(n_features,)``. If ``None``, the imputer must be fitted before it can be used. sample_size: The number of samples to draw from the background data. Increasing this value will linearly increase the runtime of the explainer. categorical_features: A list of indices of the categorical features. If ``None``, all features are treated as continuous. joint_marginal_distribution: A flag to sample the replacement values from the joint marginal distribution. If ``False``, the replacement values are sampled independently for each feature. If ``True``, the replacement values are sampled from the joint marginal distribution. normalize: A flag to normalize the game values. If ``True``, then the game values are normalized and centered to be zero for the empty set of features. random_state: The random state to use for sampling. If ``None``, the random state is not fixed. """ super().__init__( model=model, data=data, x=x, sample_size=sample_size, categorical_features=categorical_features, random_state=random_state, ) # setup attributes self.joint_marginal_distribution = joint_marginal_distribution self._replacement_data: np.ndarray = np.zeros((1, self.n_features)) self.init_background(self.data) if normalize: # update normalization value self.normalization_value = self.empty_prediction
[docs] def value_function(self, coalitions: CoalitionMatrix) -> GameValues: """Imputes the missing values of a data point and calls the model. Args: coalitions: A boolean array indicating which features are present (``True``) and which are missing (``False``). The shape of the array must be ``(n_subsets, n_features)``. Returns: The model's predictions on the imputed data points. The shape of the array is ``(n_subsets, n_outputs)``. """ n_coalitions = coalitions.shape[0] replacement_data = self._sample_replacement_data(self.sample_size) sample_size = replacement_data.shape[0] outputs = np.zeros((sample_size, n_coalitions)) imputed_data = np.tile(self.x, (n_coalitions, 1)) for i in range(self.sample_size): replacements = np.tile(replacement_data[i], (n_coalitions, 1)) imputed_data[~coalitions] = replacements[~coalitions] predictions = self.predict(imputed_data) outputs[i] = predictions outputs = np.mean(outputs, axis=0) # average over the samples # insert the better approximate empty prediction for the empty coalitions outputs[~np.any(coalitions, axis=1)] = self.empty_prediction return outputs
[docs] def init_background(self, data: np.ndarray) -> MarginalImputer: """Initializes the imputer to a background data set. The background data is used to sample replacement values for the missing features. To change the background data, use this method. Args: data: The background data to use for the imputer. The shape of the array must be ``(n_samples, n_features)``. Returns: The initialized imputer. Examples: >>> model = lambda x: np.sum(x, axis=1) >>> data = np.random.rand(10, 3) >>> imputer = MarginalImputer(model=model, data=data, x=data[0]) >>> new_data = np.random.rand(10, 3) >>> imputer.init_background(data=new_data) Raises: UserWarning: If the sample size is larger than the number of data points in the background data. In this case, the sample size is reduced to the number of data points in the background data. """ self._replacement_data = np.copy(data) if self._sample_size > self._replacement_data.shape[0]: warnings.warn(UserWarning(_too_large_sample_size_warning), stacklevel=2) self._sample_size = self._replacement_data.shape[0] self.calc_empty_prediction() # reset the empty prediction to the new background data return self
def _sample_replacement_data(self, sample_size: int | None = None) -> np.ndarray: """Samples replacement values from the background data. Args: sample_size: The number of replacement values to sample. If ``None``, all replacement values are sampled. Defaults to ``None``. Returns: The replacement values as a two-dimensional array with shape ``(sample_size, n_features)``. """ replacement_data = np.copy(self._replacement_data) rng = np.random.default_rng(self.random_state) # shuffle data if not sampling from joint marginal distribution if not self.joint_marginal_distribution: for feature in range(self.n_features): rng.shuffle(replacement_data[:, feature]) n_samples = replacement_data.shape[0] if sample_size is None or sample_size >= n_samples: return replacement_data # sample replacement values replacement_idx = rng.choice(n_samples, size=sample_size, replace=False) return replacement_data[replacement_idx]
[docs] def calc_empty_prediction(self) -> float: """Runs the model on empty data points (all features missing) to get the empty prediction. Returns: The empty prediction of the model provided only missing features. """ background_data = self._sample_replacement_data() empty_predictions = self.predict(background_data) empty_prediction = float(np.mean(empty_predictions)) self.empty_prediction = empty_prediction if self.normalize: # reset the normalization value self.normalization_value = empty_prediction return empty_prediction