Skip to content

pydvl.value.shapley.classwise

Class-wise Shapley (Schoch et al., 2022)1 offers a Shapley framework tailored for classification problems. Let \(D\) be a dataset, \(D_{y_i}\) be the subset of \(D\) with labels \(y_i\), and \(D_{-y_i}\) be the complement of \(D_{y_i}\) in \(D\). The key idea is that a sample \((x_i, y_i)\), might enhance the overall performance on \(D\), while being detrimental for the performance on \(D_{y_i}\). The Class-wise value is defined as:

\[ v_u(i) = \frac{1}{2^{|D_{-y_i}|}} \sum_{S_{-y_i}} \frac{1}{|D_{y_i}|!} \sum_{S_{y_i}} \binom{|D_{y_i}|-1}{|S_{y_i}|}^{-1} [u( S_{y_i} \cup \{i\} | S_{-y_i} ) − u( S_{y_i} | S_{-y_i})], \]

where \(S_{y_i} \subseteq D_{y_i} \setminus \{i\}\) and \(S_{-y_i} \subseteq D_{-y_i}\).

Analysis of Class-wise Shapley

For a detailed analysis of the method, with comparison to other valuation techniques, please refer to the main documentation.

In practice, the quantity above is estimated using Monte Carlo sampling of the powerset and the set of index permutations. This results in the estimator

\[ v_u(i) = \frac{1}{K} \sum_k \frac{1}{L} \sum_l [u(\sigma^{(l)}_{:i} \cup \{i\} | S^{(k)} ) − u( \sigma^{(l)}_{:i} | S^{(k)})], \]

with \(S^{(1)}, \dots, S^{(K)} \subseteq T_{-y_i},\) \(\sigma^{(1)}, \dots, \sigma^{(L)} \in \Pi(T_{y_i}\setminus\{i\}),\) and \(\sigma^{(l)}_{:i}\) denoting the set of indices in permutation \(\sigma^{(l)}\) before the position where \(i\) appears. The sets \(T_{y_i}\) and \(T_{-y_i}\) are the training sets for the labels \(y_i\) and \(-y_i\), respectively.

Notes for derivation of test cases

The unit tests include the following manually constructed data: Let \(D=\{(1,0),(2,0),(3,0),(4,1)\}\) be the test set and \(T=\{(1,0),(2,0),(3,1),(4,1)\}\) the train set. This specific dataset is chosen as it allows to solve the model

\[y = \max(0, \min(1, \text{round}(\beta^T x)))\]

in closed form \(\beta = \frac{\text{dot}(x, y)}{\text{dot}(x, x)}\). From the closed-form solution, the tables for in-class accuracy \(a_S(D_{y_i})\) and out-of-class accuracy \(a_S(D_{-y_i})\) can be calculated. By using these tables and setting \(\{S^{(1)}, \dots, S^{(K)}\} = 2^{T_{-y_i}}\) and \(\{\sigma^{(1)}, \dots, \sigma^{(L)}\} = \Pi(T_{y_i}\setminus\{i\})\), the Monte Carlo estimator can be evaluated (\(2^M\) is the powerset of \(M\)). The details of the derivation are left to the eager reader.

References


  1. Schoch, Stephanie, Haifeng Xu, and Yangfeng Ji. CS-Shapley: Class-wise Shapley Values for Data Valuation in Classification. In Proc. of the Thirty-Sixth Conference on Neural Information Processing Systems (NeurIPS). New Orleans, Louisiana, USA, 2022. 

ClasswiseScorer

ClasswiseScorer(
    scoring: Union[str, ScorerCallable] = "accuracy",
    default: float = 0.0,
    range: Tuple[float, float] = (0, 1),
    in_class_discount_fn: Callable[[float], float] = lambda x: x,
    out_of_class_discount_fn: Callable[[float], float] = np.exp,
    initial_label: Optional[int] = None,
    name: Optional[str] = None,
)

Bases: Scorer

A Scorer designed for evaluation in classification problems. Its value is computed from an in-class and an out-of-class "inner score" (Schoch et al., 2022) 1. Let \(S\) be the training set and \(D\) be the valuation set. For each label \(c\), \(D\) is factorized into two disjoint sets: \(D_c\) for in-class instances and \(D_{-c}\) for out-of-class instances. The score combines an in-class metric of performance, adjusted by a discounted out-of-class metric. These inner scores must be provided upon construction or default to accuracy. They are combined into:

\[ u(S_{y_i}) = f(a_S(D_{y_i}))\ g(a_S(D_{-y_i})), \]

where \(f\) and \(g\) are continuous, monotonic functions. For a detailed explanation, refer to section four of (Schoch et al., 2022) 1.

Warning

Metrics must support multiple class labels if you intend to apply them to a multi-class problem. For instance, the metric 'accuracy' supports multiple classes, but the metric f1 does not. For a two-class classification problem, using f1_weighted is essentially equivalent to using accuracy.

PARAMETER DESCRIPTION
scoring

Name of the scoring function or a callable that can be passed to Scorer.

TYPE: Union[str, ScorerCallable] DEFAULT: 'accuracy'

default

Score to use when a model fails to provide a number, e.g. when too little was used to train it, or errors arise.

TYPE: float DEFAULT: 0.0

range

Numerical range of the score function. Some Monte Carlo methods can use this to estimate the number of samples required for a certain quality of approximation. If not provided, it can be read from the scoring object if it provides it, for instance if it was constructed with compose_score.

TYPE: Tuple[float, float] DEFAULT: (0, 1)

in_class_discount_fn

Continuous, monotonic increasing function used to discount the in-class score.

TYPE: Callable[[float], float] DEFAULT: lambda x: x

out_of_class_discount_fn

Continuous, monotonic increasing function used to discount the out-of-class score.

TYPE: Callable[[float], float] DEFAULT: exp

initial_label

Set initial label (for the first iteration)

TYPE: Optional[int] DEFAULT: None

name

Name of the scorer. If not provided, the name of the inner scoring function will be prefixed by classwise.

TYPE: Optional[str] DEFAULT: None

New in version 0.7.1

Source code in src/pydvl/value/shapley/classwise.py
def __init__(
    self,
    scoring: Union[str, ScorerCallable] = "accuracy",
    default: float = 0.0,
    range: Tuple[float, float] = (0, 1),
    in_class_discount_fn: Callable[[float], float] = lambda x: x,
    out_of_class_discount_fn: Callable[[float], float] = np.exp,
    initial_label: Optional[int] = None,
    name: Optional[str] = None,
):
    disc_score_in_class = in_class_discount_fn(range[1])
    disc_score_out_of_class = out_of_class_discount_fn(range[1])
    transformed_range = (0, disc_score_in_class * disc_score_out_of_class)
    super().__init__(
        scoring=scoring,
        range=transformed_range,
        default=default,
        name=name or f"classwise {str(scoring)}",
    )
    self._in_class_discount_fn = in_class_discount_fn
    self._out_of_class_discount_fn = out_of_class_discount_fn
    self.label = initial_label

estimate_in_class_and_out_of_class_score

estimate_in_class_and_out_of_class_score(
    model: SupervisedModel,
    x_test: NDArray[float_],
    y_test: NDArray[int_],
    rescale_scores: bool = True,
) -> Tuple[float, float]

Computes in-class and out-of-class scores using the provided inner scoring function. The result is

\[ a_S(D=\{(x_1, y_1), \dots, (x_K, y_K)\}) = \frac{1}{N} \sum_k s(y(x_k), y_k). \]

In this context, for label \(c\) calculations are executed twice: once for \(D_c\) and once for \(D_{-c}\) to determine the in-class and out-of-class scores, respectively. By default, the raw scores are multiplied by \(\frac{|D_c|}{|D|}\) and \(\frac{|D_{-c}|}{|D|}\), respectively. This is done to ensure that both scores are of the same order of magnitude. This normalization is particularly useful when the inner score function \(a_S\) is calculated by an estimator of the form \(\frac{1}{N} \sum_i x_i\), e.g. the accuracy.

PARAMETER DESCRIPTION
model

Model used for computing the score on the validation set.

TYPE: SupervisedModel

x_test

Array containing the features of the classification problem.

TYPE: NDArray[float_]

y_test

Array containing the labels of the classification problem.

TYPE: NDArray[int_]

rescale_scores

If set to True, the scores will be denormalized. This is particularly useful when the inner score function \(a_S\) is calculated by an estimator of the form \(\frac{1}{N} \sum_i x_i\).

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Tuple[float, float]

Tuple containing the in-class and out-of-class scores.

Source code in src/pydvl/value/shapley/classwise.py
def estimate_in_class_and_out_of_class_score(
    self,
    model: SupervisedModel,
    x_test: NDArray[np.float_],
    y_test: NDArray[np.int_],
    rescale_scores: bool = True,
) -> Tuple[float, float]:
    r"""
    Computes in-class and out-of-class scores using the provided inner
    scoring function. The result is

    $$
    a_S(D=\{(x_1, y_1), \dots, (x_K, y_K)\}) = \frac{1}{N} \sum_k s(y(x_k), y_k).
    $$

    In this context, for label $c$ calculations are executed twice: once for $D_c$
    and once for $D_{-c}$ to determine the in-class and out-of-class scores,
    respectively. By default, the raw scores are multiplied by $\frac{|D_c|}{|D|}$
    and $\frac{|D_{-c}|}{|D|}$, respectively. This is done to ensure that both
    scores are of the same order of magnitude. This normalization is particularly
    useful when the inner score function $a_S$ is calculated by an estimator of the
    form $\frac{1}{N} \sum_i x_i$, e.g. the accuracy.

    Args:
        model: Model used for computing the score on the validation set.
        x_test: Array containing the features of the classification problem.
        y_test: Array containing the labels of the classification problem.
        rescale_scores: If set to True, the scores will be denormalized. This is
            particularly useful when the inner score function $a_S$ is calculated by
            an estimator of the form $\frac{1}{N} \sum_i x_i$.

    Returns:
        Tuple containing the in-class and out-of-class scores.
    """
    scorer = self._scorer
    label_set_match = y_test == self.label
    label_set = np.where(label_set_match)[0]
    num_classes = len(np.unique(y_test))

    if len(label_set) == 0:
        return 0, 1 / (num_classes - 1)

    complement_label_set = np.where(~label_set_match)[0]
    in_class_score = scorer(model, x_test[label_set], y_test[label_set])
    out_of_class_score = scorer(
        model, x_test[complement_label_set], y_test[complement_label_set]
    )

    if rescale_scores:
        n_in_class = np.count_nonzero(y_test == self.label)
        n_out_of_class = len(y_test) - n_in_class
        in_class_score *= n_in_class / (n_in_class + n_out_of_class)
        out_of_class_score *= n_out_of_class / (n_in_class + n_out_of_class)

    return in_class_score, out_of_class_score

compute_classwise_shapley_values

compute_classwise_shapley_values(
    u: Utility,
    *,
    done: StoppingCriterion,
    truncation: TruncationPolicy,
    done_sample_complements: Optional[StoppingCriterion] = None,
    normalize_values: bool = True,
    use_default_scorer_value: bool = True,
    min_elements_per_label: int = 1,
    n_jobs: int = 1,
    parallel_backend: Optional[ParallelBackend] = None,
    config: Optional[ParallelConfig] = None,
    progress: bool = False,
    seed: Optional[Seed] = None
) -> ValuationResult

Computes an approximate Class-wise Shapley value by sampling independent permutations of the index set for each label and index sets sampled from the powerset of the complement (with respect to the currently evaluated label), approximating the sum:

\[ v_u(i) = \frac{1}{K} \sum_k \frac{1}{L} \sum_l [u(\sigma^{(l)}_{:i} \cup \{i\} | S^{(k)} ) − u( \sigma^{(l)}_{:i} | S^{(k)})], \]

where \(\sigma_{:i}\) denotes the set of indices in permutation sigma before the position where \(i\) appears and \(S\) is a subset of the index set of all other labels (see the main documentation for details).

PARAMETER DESCRIPTION
u

Utility object containing model, data, and scoring function. The scorer must be of type ClasswiseScorer.

TYPE: Utility

done

Function that checks whether the computation needs to stop.

TYPE: StoppingCriterion

truncation

Callable function that decides whether to interrupt processing a permutation and set subsequent marginals to zero.

TYPE: TruncationPolicy

done_sample_complements

Function checking whether computation needs to stop. Otherwise, it will resample conditional sets until the stopping criterion is met.

TYPE: Optional[StoppingCriterion] DEFAULT: None

normalize_values

Indicates whether to normalize the values by the variation in each class times their in-class accuracy.

TYPE: bool DEFAULT: True

done_sample_complements

Number of times to resample the complement set for each permutation.

TYPE: Optional[StoppingCriterion] DEFAULT: None

use_default_scorer_value

The first set of indices is the sampled complement set. Unless not otherwise specified, the default scorer value is used for this. If it is set to false, the base score is calculated from the utility.

TYPE: bool DEFAULT: True

min_elements_per_label

The minimum number of elements for each opposite label.

TYPE: int DEFAULT: 1

n_jobs

Number of parallel jobs to run.

TYPE: int DEFAULT: 1

parallel_backend

Parallel backend instance to use for parallelizing computations. If None, use JoblibParallelBackend backend. See the Parallel Backends package for available options.

TYPE: Optional[ParallelBackend] DEFAULT: None

config

(DEPRECATED) Object configuring parallel computation, with cluster address, number of cpus, etc.

TYPE: Optional[ParallelConfig] DEFAULT: None

progress

Whether to display a progress bar.

TYPE: bool DEFAULT: False

seed

Either an instance of a numpy random number generator or a seed for it.

TYPE: Optional[Seed] DEFAULT: None

RETURNS DESCRIPTION
ValuationResult

ValuationResult object containing computed data values.

New in version 0.7.1

Source code in src/pydvl/value/shapley/classwise.py
@deprecated(
    target=True,
    args_mapping={"config": "config"},
    deprecated_in="0.9.0",
    remove_in="0.10.0",
)
def compute_classwise_shapley_values(
    u: Utility,
    *,
    done: StoppingCriterion,
    truncation: TruncationPolicy,
    done_sample_complements: Optional[StoppingCriterion] = None,
    normalize_values: bool = True,
    use_default_scorer_value: bool = True,
    min_elements_per_label: int = 1,
    n_jobs: int = 1,
    parallel_backend: Optional[ParallelBackend] = None,
    config: Optional[ParallelConfig] = None,
    progress: bool = False,
    seed: Optional[Seed] = None,
) -> ValuationResult:
    r"""
    Computes an approximate Class-wise Shapley value by sampling independent
    permutations of the index set for each label and index sets sampled from the
    powerset of the complement (with respect to the currently evaluated label),
    approximating the sum:

    $$
    v_u(i) = \frac{1}{K} \sum_k \frac{1}{L} \sum_l
    [u(\sigma^{(l)}_{:i} \cup \{i\} | S^{(k)} ) − u( \sigma^{(l)}_{:i} | S^{(k)})],
    $$

    where $\sigma_{:i}$ denotes the set of indices in permutation sigma before
    the position where $i$ appears and $S$ is a subset of the index set of all
    other labels (see [the main documentation][class-wise-shapley] for
    details).

    Args:
        u: Utility object containing model, data, and scoring function. The
            scorer must be of type
            [ClasswiseScorer][pydvl.value.shapley.classwise.ClasswiseScorer].
        done: Function that checks whether the computation needs to stop.
        truncation: Callable function that decides whether to interrupt processing a
            permutation and set subsequent marginals to zero.
        done_sample_complements: Function checking whether computation needs to stop.
            Otherwise, it will resample conditional sets until the stopping criterion is
            met.
        normalize_values: Indicates whether to normalize the values by the variation
            in each class times their in-class accuracy.
        done_sample_complements: Number of times to resample the complement set
            for each permutation.
        use_default_scorer_value: The first set of indices is the sampled complement
            set. Unless not otherwise specified, the default scorer value is used for
            this. If it is set to false, the base score is calculated from the utility.
        min_elements_per_label: The minimum number of elements for each opposite
            label.
        n_jobs: Number of parallel jobs to run.
        parallel_backend: Parallel backend instance to use
            for parallelizing computations. If `None`,
            use [JoblibParallelBackend][pydvl.parallel.backends.JoblibParallelBackend] backend.
            See the [Parallel Backends][pydvl.parallel.backends] package
            for available options.
        config: (**DEPRECATED**) Object configuring parallel computation,
            with cluster address, number of cpus, etc.
        progress: Whether to display a progress bar.
        seed: Either an instance of a numpy random number generator or a seed for it.

    Returns:
        ValuationResult object containing computed data values.

    !!! tip "New in version 0.7.1"
    """
    dim_correct = u.data.y_train.ndim == 1 and u.data.y_test.ndim == 1
    is_integral = all(
        map(
            lambda v: isinstance(v, numbers.Integral), (*u.data.y_train, *u.data.y_test)
        )
    )
    if not dim_correct or not is_integral:
        raise ValueError(
            "The supplied dataset has to be a 1-dimensional classification dataset."
        )

    if not isinstance(u.scorer, ClasswiseScorer):
        raise ValueError(
            "Please set a subclass of ClasswiseScorer object as scorer object of the"
            " utility. See scoring argument of Utility."
        )

    parallel_backend = _maybe_init_parallel_backend(parallel_backend, config)
    u_ref = parallel_backend.put(u)
    n_jobs = parallel_backend.effective_n_jobs(n_jobs)
    n_submitted_jobs = 2 * n_jobs

    pbar = tqdm(disable=not progress, position=0, total=100, unit="%")
    algorithm = "classwise_shapley"
    accumulated_result = ValuationResult.zeros(
        algorithm=algorithm, indices=u.data.indices, data_names=u.data.data_names
    )
    terminate_exec = False
    seed_sequence = ensure_seed_sequence(seed)

    parallel_backend = _maybe_init_parallel_backend(parallel_backend, config)

    with parallel_backend.executor(max_workers=n_jobs) as executor:
        pending: Set[Future] = set()
        while True:
            completed_futures, pending = wait(
                pending, timeout=60, return_when=FIRST_COMPLETED
            )
            for future in completed_futures:
                accumulated_result += future.result()
                if done(accumulated_result):
                    terminate_exec = True
                    break

            pbar.n = 100 * done.completion()
            pbar.refresh()
            if terminate_exec:
                break

            n_remaining_slots = n_submitted_jobs - len(pending)
            seeds = seed_sequence.spawn(n_remaining_slots)
            for i in range(n_remaining_slots):
                future = executor.submit(
                    _permutation_montecarlo_classwise_shapley_one_step,
                    u_ref,
                    truncation=truncation,
                    done_sample_complements=done_sample_complements,
                    use_default_scorer_value=use_default_scorer_value,
                    min_elements_per_label=min_elements_per_label,
                    algorithm_name=algorithm,
                    seed=seeds[i],
                )
                pending.add(future)

    result = accumulated_result
    if normalize_values:
        result = _normalize_classwise_shapley_values(result, u)

    return result