Skip to content

pydvl.valuation.samplers.msr

This module implements Maximum Sample Re-use (MSR) sampling for valuation, as described in (Wang et al.)1.

The idea behind MSR is to update all indices in the dataset with every evaluation of the utility function on a sample. Updates are divided into positive, if the index is in the sample, and negative, if it is not. The two running means are later combined into a final result.

Note that this requires defining a special evaluation strategy and result updater, as returned by the make_strategy and result_updater methods, respectively.

For more on the general architecture of samplers see pydvl.valuation.samplers.

References


  1. Wang, J.T. and Jia, R., 2023. Data Banzhaf: A Robust Data Valuation Framework for Machine Learning. In: Proceedings of The 26th International Conference on Artificial Intelligence and Statistics, pp. 6388-6421. 

MSRValueUpdate dataclass

MSRValueUpdate(idx: IndexT, log_update: float, sign: int, in_sample: bool)

Bases: ValueUpdate

Update for Maximum Sample Re-use (MSR) valuation (in log space). Attributes: in_sample: Whether the index to be updated was in the sample.

Source code in src/pydvl/valuation/samplers/msr.py
def __init__(self, idx: IndexT, log_update: float, sign: int, in_sample: bool):
    object.__setattr__(self, "idx", idx)
    object.__setattr__(self, "log_update", log_update)
    object.__setattr__(self, "sign", sign)
    object.__setattr__(self, "in_sample", in_sample)
    object.__setattr__(self, "update", np.exp(log_update) * sign)

MSRResultUpdater

MSRResultUpdater(result: ValuationResult)

Bases: ResultUpdater[MSRValueUpdate]

Update running means for MSR valuation (in log-space).

This class is used to update two running means for positive and negative updates separately. The two running means are later combined into a final result.

Since values computed with MSR are not a mean over marginals, both the variances of the marginals and the update counts are ill-defined. We use the following conventions:

  1. The counts are defined as the minimum of the two counts. This definition enables us to ensure a minimal number of updates for both running means via stopping criteria and correctly detects that no actual update has taken place if one of the counts is zero.

  2. We reverse engineer the variances so that they yield correct standard errors given our convention for the counts and the normal calculation of standard errors in the valuation result.

Note that we cannot use the normal addition or subtraction defined by the ValuationResult because it is weighted with counts. If we were to simply subtract the negative result from the positive we would get wrong variance estimates, misleading update counts and even wrong values if no further precaution is taken.

Source code in src/pydvl/valuation/samplers/msr.py
def __init__(self, result: ValuationResult):
    self.result = result
    self.in_sample = ValuationResult.zeros(
        algorithm=result.algorithm, indices=result.indices, data_names=result.names
    )
    self.out_of_sample = ValuationResult.zeros(
        algorithm=result.algorithm, indices=result.indices, data_names=result.names
    )

    self.update_in_sample = LogResultUpdater[MSRValueUpdate](self.in_sample)
    self.update_out_of_sample = LogResultUpdater[MSRValueUpdate](self.out_of_sample)

combine_results

combine_results() -> ValuationResult

Combine the positive and negative running means into a final result. Returns: The combined valuation result.

Verify that the two running means are statistically independent (which is

assumed in the aggregation of variances).

Source code in src/pydvl/valuation/samplers/msr.py
def combine_results(self) -> ValuationResult:
    """Combine the positive and negative running means into a final result.
    Returns:
        The combined valuation result.

    TODO: Verify that the two running means are statistically independent (which is
        assumed in the aggregation of variances).
    """
    # define counts as minimum of the two counts (see docstring)
    counts = np.minimum(self.in_sample.counts, self.out_of_sample.counts)

    values = self.in_sample.values - self.out_of_sample.values
    values[counts == 0] = np.nan

    # define variances that yield correct standard errors (see docstring)
    pos_var = self.in_sample.variances / np.clip(self.in_sample.counts, 1, np.inf)
    neg_var = self.out_of_sample.variances / np.clip(
        self.out_of_sample.counts, 1, np.inf
    )
    variances = np.where(counts != 0, (pos_var + neg_var) * counts, np.inf)

    self.result = ValuationResult(
        values=values,
        variances=variances,
        counts=counts,
        indices=self.result.indices,
        data_names=self.result.names,
        algorithm=self.result.algorithm,
    )

    return self.result

MSRSampler

MSRSampler(batch_size: int = 1, seed: Seed | None = None)

Bases: StochasticSamplerMixin, IndexSampler[MSRValueUpdate]

Sampler for unweighted Maximum Sample Re-use (MSR) valuation.

The sampling is similar to a UniformSampler but without an outer index. However,the MSR sampler uses a special evaluation strategy and result updater, as returned by the make_strategy() and result_updater() methods, respectively.

Two running means are updated separately for positive and negative updates. The two running means are later combined into a final result.

PARAMETER DESCRIPTION
batch_size

Number of samples to generate in each batch.

TYPE: int DEFAULT: 1

seed

Seed for the random number generator.

TYPE: Seed | None DEFAULT: None

Source code in src/pydvl/valuation/samplers/msr.py
def __init__(self, batch_size: int = 1, seed: Seed | None = None):
    super().__init__(batch_size=batch_size, seed=seed)

skip_indices property writable

skip_indices: IndexSetT

Indices being skipped in the sampler. The exact behaviour will be sampler-dependent, so that setting this property is disabled by default.

interrupt

interrupt()

Signals the sampler to stop generating samples after the current batch.

Source code in src/pydvl/valuation/samplers/base.py
def interrupt(self):
    """Signals the sampler to stop generating samples after the current batch."""
    self._interrupted = True

__len__

__len__() -> int

Returns the length of the current sample generation in generate_batches.

RAISES DESCRIPTION
`TypeError`

if the sampler is infinite or generate_batches has not been called yet.

Source code in src/pydvl/valuation/samplers/base.py
def __len__(self) -> int:
    """Returns the length of the current sample generation in generate_batches.

    Raises:
        `TypeError`: if the sampler is infinite or
            [generate_batches][pydvl.valuation.samplers.IndexSampler.generate_batches]
            has not been called yet.
    """
    if self._len is None:
        raise TypeError(f"This {self.__class__.__name__} has no length")
    return self._len

generate_batches

generate_batches(indices: IndexSetT) -> BatchGenerator

Batches the samples and yields them.

Source code in src/pydvl/valuation/samplers/base.py
def generate_batches(self, indices: IndexSetT) -> BatchGenerator:
    """Batches the samples and yields them."""
    self._len = self.sample_limit(indices)

    # Create an empty generator if the indices are empty: `return` acts like a
    # `break`, and produces an empty generator.
    if len(indices) == 0:
        return

    self._interrupted = False
    self._n_samples = 0
    for batch in chunked(self.generate(indices), self.batch_size):
        self._n_samples += len(batch)
        yield batch
        if self._interrupted:
            break

log_weight

log_weight(n: int, subset_len: int) -> float

Probability of sampling a set of size k.

In the MSR scheme, the sampling is done from the full power set \(2^N\) (each set \(S \subseteq N\) with probability \(1 / 2^n\)), and then for each data point \(i\) one partitions the sample into:

* $\mathcal{S}_{\ni i} = \{S \in \mathcal{S}: i \in S\},$ and
* $\mathcal{S}_{\nni i} = \{S \in \mathcal{S}: i \nin S\}.$.

When we condition on the event \(i \in S\), the remaining part \(S_{- i}\) is uniformly distributed over \(2^{N_{- i}}\). In other words, the act of partitioning recovers the uniform distribution on \(2^{N_{- i}}\) "for free" because

\[P (S_{- i} = T \mid i \in S) = \frac{1}{2^{n - 1}},\]

for each \(T \subseteq N_{- i}\).

PARAMETER DESCRIPTION
n

Size of the index set.

TYPE: int

subset_len

Size of the subset.

TYPE: int

RETURNS DESCRIPTION
float

The logarithm of the probability of having sampled a set of size subset_len.

Source code in src/pydvl/valuation/samplers/msr.py
def log_weight(self, n: int, subset_len: int) -> float:
    r"""Probability of sampling a set of size k.

    In the **MSR scheme**, the sampling is done from the full power set $2^N$ (each
    set $S \subseteq N$ with probability $1 / 2^n$), and then for each data point
    $i$ one partitions the sample into:

        * $\mathcal{S}_{\ni i} = \{S \in \mathcal{S}: i \in S\},$ and
        * $\mathcal{S}_{\nni i} = \{S \in \mathcal{S}: i \nin S\}.$.

    When we condition on the event $i \in S$, the remaining part $S_{- i}$ is
    uniformly distributed over $2^{N_{- i}}$. In other words, the act of
    partitioning recovers the uniform distribution on $2^{N_{- i}}$ "for free"
    because

    $$P (S_{- i} = T \mid i \in S) = \frac{1}{2^{n - 1}},$$

    for each $T \subseteq N_{- i}$.

    Args:
        n: Size of the index set.
        subset_len: Size of the subset.

    Returns:
        The logarithm of the probability of having sampled a set of size
            `subset_len`.
    """
    return float(-(n - 1) * np.log(2)) if n > 0 else 0.0

make_strategy

make_strategy(
    utility: UtilityBase, coefficient: Callable[[int, int], float] | None = None
) -> MSREvaluationStrategy

Returns the strategy for this sampler.

PARAMETER DESCRIPTION
utility

Utility function to evaluate.

TYPE: UtilityBase

coefficient

Coefficient function for the utility function.

TYPE: Callable[[int, int], float] | None DEFAULT: None

Source code in src/pydvl/valuation/samplers/msr.py
def make_strategy(
    self,
    utility: UtilityBase,
    coefficient: Callable[[int, int], float] | None = None,
) -> MSREvaluationStrategy:
    """Returns the strategy for this sampler.

    Args:
        utility: Utility function to evaluate.
        coefficient: Coefficient function for the utility function.
    """
    assert coefficient is not None
    return MSREvaluationStrategy(self, utility, coefficient)

result_updater

result_updater(result: ValuationResult) -> ResultUpdater

Returns a callable that updates a valuation result with an MSR value update.

MSR updates two running means for positive and negative updates separately. The two running means are later combined into a final result.

PARAMETER DESCRIPTION
result

The valuation result to update with each call of the returned callable.

TYPE: ValuationResult

Returns: A callable object that updates the valuation result with very MSRValueUpdate.

Source code in src/pydvl/valuation/samplers/msr.py
def result_updater(self, result: ValuationResult) -> ResultUpdater:
    """Returns a callable that updates a valuation result with an MSR value update.

    MSR updates two running means for positive and negative updates separately. The
    two running means are later combined into a final result.

    Args:
        result: The valuation result to update with each call of the returned
            callable.
    Returns:
        A callable object that updates the valuation result with very
            [MSRValueUpdate][pydvl.valuation.samplers.msr.MSRValueUpdate].
    """
    return MSRResultUpdater(result)

MSREvaluationStrategy

MSREvaluationStrategy(
    sampler: SamplerT,
    utility: UtilityBase,
    log_coefficient: Callable[[int, int], float] | None = None,
)

Bases: EvaluationStrategy[MSRSampler, MSRValueUpdate]

Evaluation strategy for Maximum Sample Re-use (MSR) valuation in log space.

The MSR evaluation strategy makes one utility evaluation per sample but generates n_indices many updates from it. The updates will be used to update two running means that will later be combined into a final value. We use the field ValueUpdate.in_sample field to inform MSRResultUpdater of which of the two running means must be updated.

Source code in src/pydvl/valuation/samplers/base.py
def __init__(
    self,
    sampler: SamplerT,
    utility: UtilityBase,
    log_coefficient: Callable[[int, int], float] | None = None,
):
    self.utility = utility
    # Used by the decorator suppress_warnings:
    self.show_warnings = getattr(utility, "show_warnings", False)
    self.n_indices = (
        len(utility.training_data) if utility.training_data is not None else 0
    )

    if log_coefficient is not None:

        def correction_fun(n: int, subset_len: int) -> float:
            return log_coefficient(n, subset_len) - sampler.log_weight(
                n, subset_len
            )

        self.log_correction = correction_fun
    else:
        self.log_correction = lambda n, subset_len: 0.0