Skip to content

pydvl.valuation.methods.gt_shapley

This module implements Group Testing for the approximation of Shapley values, as introduced in (Jia, R. et al., 2019).1

Computation of the Shapley value is transformed into a feasibility problem, where the constraints involve certain utility evaluations of sampled subsets. The sampling is done in such a way that an approximation to the true Shapley values can be computed with guarantees.3

Warning

Group Testing is very inefficient. Potential improvements to the implementation notwithstanding, convergence seems to be very slow (in terms of evaluations of the utility required). We recommend other Monte Carlo methods instead. See the whole list here.

You can read about data valuation in general in the documentation.

References


  1. Jia, R. et al., 2019. Towards Efficient Data Valuation Based on the Shapley Value. In: Proceedings of the 22nd International Conference on Artificial Intelligence and Statistics, pp. 1167–1176. PMLR. 

  2. Jia, R. et al., 2023. A Note on "Towards Efficient Data Valuation Based on the Shapley Value"

  3. Internally, this sampling is achieved with a StratifiedSampler with GroupTestingSampleSize strategy. 

GroupTestingProblem

Bases: NamedTuple

Solver agnostic representation of the group-testing problem.

GroupTestingShapleyValuation

GroupTestingShapleyValuation(
    utility: UtilityBase,
    n_samples: int,
    epsilon: float,
    solver_options: dict | None = None,
    progress: bool = True,
    seed: Seed | None = None,
    batch_size: int = 1,
)

Bases: Valuation

Class to calculate the group-testing approximation to shapley values.

See (Jia, R. et al., 2019)1 for a description of the method, and Data valuation for an overview of data valuation.

Warning

Group Testing is very inefficient. Potential improvements to the implementation notwithstanding, convergence seems to be very slow (in terms of evaluations of the utility required). We recommend other Monte Carlo methods instead. See the whole list here.

PARAMETER DESCRIPTION
utility

Utility object with model, data and scoring function.

TYPE: UtilityBase

n_samples

The number of samples to use. A sample size with theoretical guarantees can be computed using compute_n_samples().

TYPE: int

epsilon

The error tolerance.

TYPE: float

solver_options

Optional dictionary containing a CVXPY solver and options to configure it. For valid values to the "solver" key see this tutorial. For additional options cvxpy's documentation.

TYPE: dict | None DEFAULT: None

progress

Whether to show a progress bar during the construction of the group-testing problem.

TYPE: bool DEFAULT: True

seed

Seed for the random number generator.

TYPE: Seed | None DEFAULT: None

batch_size

The number of samples to draw in each batch. Can be used to reduce parallelization overhead for fast utilities. Defaults to 1.

TYPE: int DEFAULT: 1

Source code in src/pydvl/valuation/methods/gt_shapley.py
def __init__(
    self,
    utility: UtilityBase,
    n_samples: int,
    epsilon: float,
    solver_options: dict | None = None,
    progress: bool = True,
    seed: Seed | None = None,
    batch_size: int = 1,
):
    super().__init__()

    self._utility = utility
    self._n_samples = n_samples
    self._solver_options = solver_options
    self._progress = progress
    self._sampler = StratifiedSampler(
        index_iteration=NoIndexIteration,
        sample_sizes=GroupTestingSampleSize(),
        sample_sizes_iteration=RandomSizeIteration,
        batch_size=batch_size,
        seed=seed,
    )
    self._epsilon = epsilon

result property

The current valuation result (not a copy).

fit

fit(data: Dataset, continue_from: ValuationResult | None = None) -> Self

Calculate the group-testing valuation on a dataset.

This method has to be called before calling values().

Calculating the least core valuation is a computationally expensive task that can be parallelized. To do so, call the fit() method inside a joblib.parallel_config context manager as follows:

from joblib import parallel_config

with parallel_config(n_jobs=4):
    valuation.fit(data)
Source code in src/pydvl/valuation/methods/gt_shapley.py
def fit(self, data: Dataset, continue_from: ValuationResult | None = None) -> Self:
    """Calculate the group-testing valuation on a dataset.

    This method has to be called before calling `values()`.

    Calculating the least core valuation is a computationally expensive task that
    can be parallelized. To do so, call the `fit()` method inside a
    `joblib.parallel_config` context manager as follows:

    ```python
    from joblib import parallel_config

    with parallel_config(n_jobs=4):
        valuation.fit(data)
    ```

    """
    self._result = self._init_or_check_result(data, continue_from)
    self._utility = self._utility.with_dataset(data)

    problem = create_group_testing_problem(
        utility=self._utility,
        sampler=self._sampler,
        n_samples=self._n_samples,
        progress=self._progress,
        epsilon=self._epsilon,
    )

    solution = solve_group_testing_problem(
        problem=problem,
        solver_options=self._solver_options,
        algorithm_name=self.algorithm_name,
        data_names=data.names,
    )

    self._result += solution
    return self

values

values(sort: bool = False) -> ValuationResult

Returns a copy of the valuation result.

The valuation must have been run with fit() before calling this method.

PARAMETER DESCRIPTION
sort

Whether to sort the valuation result by value before returning it.

TYPE: bool DEFAULT: False

Returns: The result of the valuation.

Source code in src/pydvl/valuation/base.py
@deprecated(
    target=None,
    deprecated_in="0.10.0",
    remove_in="0.11.0",
)
def values(self, sort: bool = False) -> ValuationResult:
    """Returns a copy of the valuation result.

    The valuation must have been run with `fit()` before calling this method.

    Args:
        sort: Whether to sort the valuation result by value before returning it.
    Returns:
        The result of the valuation.
    """
    if not self.is_fitted:
        raise NotFittedException(type(self))
    assert self._result is not None

    r = self._result.copy()
    if sort:
        r.sort(inplace=True)
    return r

compute_n_samples

compute_n_samples(epsilon: float, delta: float, n_obs: int) -> int

Compute the minimal sample size with epsilon-delta guarantees.

Based on the formula in Theorem 4 of (Jia, R. et al., 2023)2 which gives a lower bound on the number of samples required to obtain an (ε/√n,δ/(N(N-1))-approximation to all pair-wise differences of Shapley values, wrt. \(\ell_2\) norm.

The updated version refines the lower bound of the original paper. Note that the bound is tighter than earlier versions but might still overestimate the number of samples required.

PARAMETER DESCRIPTION
epsilon

The error tolerance.

TYPE: float

delta

The confidence level.

TYPE: float

n_obs

Number of data points.

TYPE: int

RETURNS DESCRIPTION
int

The sample size.

Source code in src/pydvl/valuation/methods/gt_shapley.py
def compute_n_samples(epsilon: float, delta: float, n_obs: int) -> int:
    r"""Compute the minimal sample size with epsilon-delta guarantees.

    Based on the formula in Theorem 4 of
    (Jia, R. et al., 2023)<sup><a href="#jia_update_2023">2</a></sup>
    which gives a lower bound on the number of samples required to obtain an
    (ε/√n,δ/(N(N-1))-approximation to all pair-wise differences of Shapley
    values, wrt. $\ell_2$ norm.

    The updated version refines the lower bound of the original paper. Note that the
    bound is tighter than earlier versions but might still overestimate the number of
    samples required.

    Args:
        epsilon: The error tolerance.
        delta: The confidence level.
        n_obs: Number of data points.

    Returns:
        The sample size.

    """
    kk = _create_sample_sizes(n_obs)
    Z = _calculate_z(n_obs)

    q = _create_sampling_probabilities(kk)
    q_tot = (n_obs - 2) / n_obs * q[0] + np.inner(
        q[1:], 1 + 2 * kk[1:] * (kk[1:] - n_obs) / (n_obs * (n_obs - 1))
    )

    def _h(u: float) -> float:
        return cast(float, (1 + u) * np.log(1 + u) - u)

    n_samples = np.log(n_obs * (n_obs - 1) / delta)
    n_samples /= 1 - q_tot
    n_samples /= _h(epsilon / (2 * Z * np.sqrt(n_obs) * (1 - q_tot)))

    return int(n_samples)

create_group_testing_problem

create_group_testing_problem(
    utility: UtilityBase,
    sampler: IndexSampler,
    n_samples: int,
    progress: bool,
    epsilon: float,
) -> GroupTestingProblem

Create the feasibility problem that characterizes group testing shapley values.

PARAMETER DESCRIPTION
utility

Utility object with model, data and scoring function.

TYPE: UtilityBase

sampler

The sampler to use for the valuation.

TYPE: IndexSampler

n_samples

The number of samples to use.

TYPE: int

progress

Whether to show a progress bar.

TYPE: bool

epsilon

The error tolerance.

TYPE: float

RETURNS DESCRIPTION
GroupTestingProblem

A GroupTestingProblem object.

Source code in src/pydvl/valuation/methods/gt_shapley.py
def create_group_testing_problem(
    utility: UtilityBase,
    sampler: IndexSampler,
    n_samples: int,
    progress: bool,
    epsilon: float,
) -> GroupTestingProblem:
    """Create the feasibility problem that characterizes group testing shapley values.

    Args:
        utility: Utility object with model, data and scoring function.
        sampler: The sampler to use for the valuation.
        n_samples: The number of samples to use.
        progress: Whether to show a progress bar.
        epsilon: The error tolerance.

    Returns:
        A GroupTestingProblem object.

    """
    if utility.training_data is None:
        raise ValueError("Utility object must have training data.")

    u_values, masks = compute_utility_values_and_sample_masks(
        utility=utility,
        sampler=sampler,
        n_samples=n_samples,
        progress=progress,
        extra_samples=[Sample(idx=None, subset=utility.training_data.indices)],
    )

    total_utility = u_values[-1].item()
    u_values = u_values[:-1]
    masks = masks[:-1]

    u_differences = _calculate_utility_differences(
        utility_values=u_values, masks=masks, n_obs=len(utility.training_data)
    )

    problem = GroupTestingProblem(
        utility_differences=u_differences,
        total_utility=total_utility,
        epsilon=epsilon,
    )

    return problem

solve_group_testing_problem

solve_group_testing_problem(
    problem: GroupTestingProblem,
    solver_options: dict | None,
    algorithm_name: str,
    data_names: NDArray[NameT],
) -> ValuationResult

Solve the group testing problem and create a ValuationResult.

PARAMETER DESCRIPTION
problem

The group testing problem.

TYPE: GroupTestingProblem

solver_options

Optional dictionary containing a CVXPY solver and options to configure it. For valid values to the "solver" key see here. For additional options see here.

TYPE: dict | None

algorithm_name

The name of the algorithm.

TYPE: str

data_names

The names of the data columns.

TYPE: NDArray[NameT]

RETURNS DESCRIPTION
ValuationResult

A ValuationResult object.

Source code in src/pydvl/valuation/methods/gt_shapley.py
def solve_group_testing_problem(
    problem: GroupTestingProblem,
    solver_options: dict | None,
    algorithm_name: str,
    data_names: NDArray[NameT],
) -> ValuationResult:
    """Solve the group testing problem and create a ValuationResult.

    Args:
        problem: The group testing problem.
        solver_options: Optional dictionary containing a CVXPY solver and options to
            configure it. For valid values to the "solver" key see
            [here](https://www.cvxpy.org/tutorial/solvers/index.html#choosing-a-solver).
            For additional options see
            [here](https://www.cvxpy.org/tutorial/solvers/index.html#setting-solver-options).
        algorithm_name: The name of the algorithm.
        data_names: The names of the data columns.

    Returns:
        A ValuationResult object.

    """

    solver_options = {} if solver_options is None else solver_options.copy()

    C = problem.utility_differences
    total_utility = problem.total_utility
    epsilon = problem.epsilon
    n_obs = len(C)

    v = cp.Variable(n_obs)
    constraints = [cp.sum(v) == total_utility]
    for i in range(n_obs):
        for j in range(i + 1, n_obs):
            constraints.append(v[i] - v[j] <= epsilon + C[i, j])
            constraints.append(v[j] - v[i] <= epsilon - C[i, j])

    cp_problem = cp.Problem(cp.Minimize(0), constraints)
    solver = solver_options.pop("solver", cp.SCS)
    cp_problem.solve(solver=solver, **solver_options)

    if cp_problem.status != "optimal":
        log.warning(f"cvxpy returned status {cp_problem.status}")
        values = (
            np.nan * np.ones_like(n_obs)
            if not hasattr(v.value, "__len__")
            else cast(NDArray[np.float64], v.value)
        )
        status = Status.Failed
    else:
        values = cast(NDArray[np.float64], v.value)
        status = Status.Converged

    result = ValuationResult(
        status=status,
        values=values,
        data_names=data_names,
        solver_status=cp_problem.status,
        algorithm=algorithm_name,
    )

    return result