"""Implementation of the GenerativeConditionalImputer."""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING
import numpy as np
from shapiq.approximator.sampling import CoalitionSampler
from shapiq.utils.modules import check_import_module
from .base import Imputer
if TYPE_CHECKING:
from typing import Literal
from shapiq.typing import Model
[docs]
class GenerativeConditionalImputer(Imputer):
"""A GenerativeConditionalImputer for the shapiq package.
The GenerativeConditionalImputer is used to impute the missing values of a data point by using the
conditional distribution estimated with the background data.
Attributes:
empty_prediction: The model's prediction on an empty data point (all features missing).
"""
def __init__(
self,
model: Model,
data: np.ndarray,
x: np.ndarray | None = None,
*,
sample_size: int = 10,
conditional_budget: int = 128,
conditional_threshold: float = 0.05,
normalize: bool = True,
categorical_features: list[int] | None = None,
method: Literal["generative"] = "generative",
random_state: int | None = None,
) -> None:
"""Initializes the GenerativeConditionalImputer.
Args:
model: The model to explain as a callable function expecting a data points as input and
returning the model's predictions.
data: The background data to use for the explainer as a two-dimensional array with shape
``(n_samples, n_features)``.
x: The explanation point to use the imputer on.
sample_size: The number of samples to draw from the conditional background data for
imputation. Defaults to ``10``.
conditional_budget: The number of coallitions to sample per each point in ``data`` for
training the generative model. Defaults to ``16``.
conditional_threshold: A quantile threshold defining a neighbourhood of samples to draw
``sample_size`` from. A value between ``0.0`` and ``1.0``. Defaults to ``0.05``.
normalize: A flag to normalize the game values. If ``True`` (default), then the game
values are normalized and centered to be zero for the empty set of features.
Defaults to ``True``.
categorical_features: A list of indices of the categorical features in the background
data. Currently unused.
method: The method to use for the GenerativeConditionalImputer. Currently only ``"generative"``
is implemented. Defaults to ``"generative"``.
random_state: The random state to use for sampling. Defaults to ``None``.
"""
super().__init__(
model=model,
data=data,
x=x,
sample_size=sample_size,
categorical_features=categorical_features,
random_state=random_state,
)
if method != "generative":
msg = "Currently only a generative GenerativeConditionalImputer is implemented."
raise ValueError(msg)
self.method = method
self.conditional_budget = conditional_budget
self.conditional_threshold = conditional_threshold
self.init_background(data=data)
# set empty value and normalization
self.empty_prediction: float = self.calc_empty_prediction()
if normalize:
self.normalization_value = self.empty_prediction
[docs]
def init_background(self, data: np.ndarray) -> GenerativeConditionalImputer:
"""Initializes the GenerativeConditionalImputer.
Args:
data: The background data to use for the imputer. The shape of the array must
be (n_samples, n_features).
Returns:
The initialized imputer.
"""
check_import_module("xgboost")
import xgboost
n_features = data.shape[1]
if self.conditional_budget > 2**n_features:
warnings.warn(
"`conditional_budget` is higher than `2**n_features`; setting "
"`conditional_budget = 2**n_features`",
stacklevel=2,
)
self.conditional_budget = 2**n_features
X_tiled = np.repeat(data, repeats=self.conditional_budget, axis=0)
coalition_sampler = CoalitionSampler(
n_players=n_features,
sampling_weights=np.array([1e-7 for _ in range(n_features + 1)]),
random_state=self.random_state,
)
coalitions_matrix = []
for _ in range(data.shape[0]):
coalition_sampler.sample(self.conditional_budget)
coalitions_matrix.append(coalition_sampler.coalitions_matrix)
coalitions_matrix = np.concatenate(coalitions_matrix, axis=0)
X_masked = X_tiled.copy()
try:
X_masked[coalitions_matrix] = np.nan # old numpy version
except AttributeError: # interim solution since numpy changed
X_masked[coalitions_matrix] = np.nan # new numpy version
tree_embedder = xgboost.XGBRegressor( # ty: ignore[possibly-missing-attribute]
random_state=self.random_state
)
tree_embedder.fit(X_masked, X_tiled)
self._data_embedded = tree_embedder.apply(data)
self._tree_embedder = tree_embedder
self._coalition_sampler = coalition_sampler
return self
[docs]
def value_function(self, coalitions: np.ndarray) -> np.ndarray:
"""Computes the value function for all coalitions.
Args:
coalitions: A boolean array indicating which features are present (`True`) and which are
missing (`False`). The shape of the array must be (n_subsets, n_features).
Returns:
The model's predictions on the imputed data points. The shape of the array is
(n_subsets, n_outputs).
"""
background_data = self._sample_background_data()
n_coalitions = coalitions.shape[0]
n_samples = background_data.shape[0]
x_tiled = np.tile(self.x, (n_coalitions * n_samples, 1))
background_data_tiled = np.tile(background_data, (n_coalitions, 1))
coalitions_tiled = np.repeat(coalitions, n_samples, axis=0)
x_tiled[~coalitions_tiled] = background_data_tiled[~coalitions_tiled]
predictions = self.predict(x_tiled)
avg_predictions = predictions.reshape(n_coalitions, -1).mean(axis=1)
# insert the better approximate empty prediction for the empty coalitions
avg_predictions[~np.any(coalitions, axis=1)] = self.empty_prediction
return avg_predictions
def _sample_background_data(self) -> np.ndarray:
"""Samples background data.
Returns:
The sampled replacement values. The shape of the array is (sample_size, n_subsets,
n_features).
"""
x_embedded = self._tree_embedder.apply(self._x)
distances = hamming_distance(self._data_embedded, x_embedded)
conditional_data = self.data[
distances <= np.quantile(distances, self.conditional_threshold)
]
if self.sample_size < conditional_data.shape[0]:
idc = self._rng.choice(conditional_data.shape[0], size=self.sample_size, replace=False)
return conditional_data[idc, :]
return conditional_data
[docs]
def calc_empty_prediction(self) -> float:
"""Runs the model on empty data points (all features missing) to get the empty prediction.
Returns:
The empty prediction.
"""
empty_predictions = self.predict(self.data)
return float(np.mean(empty_predictions))
def hamming_distance(X: np.ndarray, x: np.ndarray) -> np.ndarray:
"""Compute hamming distance between point x (1d) and points in X (2d).
References:
- https://en.wikipedia.org/wiki/Hamming_distance
"""
x_tiled = np.tile(x, (X.shape[0], 1))
return np.sum(x_tiled != X, axis=1)