Skip to content

pydvl.valuation.methods._solve_least_core_problems

LeastCoreProblem

Bases: NamedTuple

The linear programming problem that defines the least-core valuation.

See LeastCoreValuation for details.

lc_solve_problem

lc_solve_problem(
    problem: LeastCoreProblem,
    *,
    u: UtilityBase,
    algorithm: str,
    non_negative_subsidy: bool = False,
    solver_options: dict | None = None
) -> ValuationResult

Solves a linear problem as prepared by [mclc_prepare_problem()][pydvl.valuation.create_least_core_problem]. Useful for parallel execution of multiple experiments by running this as a remote task.

PARAMETER DESCRIPTION
problem

LeastCoreProblem to solve.

TYPE: LeastCoreProblem

u

Utility object with model, data and scoring function.

TYPE: UtilityBase

algorithm

Name of the least-core valuation algorithm.

TYPE: str

non_negative_subsidy

If True, the least core subsidy \(e\) is constrained to be non-negative.

TYPE: bool DEFAULT: False

solver_options

Optional dictionary containing a CVXPY solver and options to configure it. For valid values to the "solver" key see here. For additional options see here.

TYPE: dict | None DEFAULT: None

Source code in src/pydvl/valuation/methods/_solve_least_core_problems.py
def lc_solve_problem(
    problem: LeastCoreProblem,
    *,
    u: UtilityBase,
    algorithm: str,
    non_negative_subsidy: bool = False,
    solver_options: dict | None = None,
) -> ValuationResult:
    """Solves a linear problem as prepared by
    [mclc_prepare_problem()][pydvl.valuation.create_least_core_problem].
    Useful for parallel execution of multiple experiments by running this as a
    remote task.

    Args:
        problem: LeastCoreProblem to solve.
        u: Utility object with model, data and scoring function.
        algorithm: Name of the least-core valuation algorithm.
        non_negative_subsidy: If True, the least core subsidy $e$ is constrained
            to be non-negative.
        solver_options: Optional dictionary containing a CVXPY solver and options to
            configure it. For valid values to the "solver" key see
            [here](https://www.cvxpy.org/tutorial/solvers/index.html#choosing-a-solver).
            For additional options see [here](https://www.cvxpy.org/tutorial/solvers/index.html#setting-solver-options).

    """
    if u.training_data is not None:
        n_obs = len(u.training_data)
    else:
        raise ValueError("Utility object must have a training dataset.")

    if np.any(np.isnan(problem.utility_values)):
        warnings.warn(
            f"Calculation returned "
            f"{np.sum(np.isnan(problem.utility_values))} NaN "
            f"values out of {problem.utility_values.size}",
            RuntimeWarning,
        )

    if solver_options is None:
        solver_options = {}
    else:
        solver_options = solver_options.copy()

    if "solver" not in solver_options:
        solver_options["solver"] = cp.SCS

    if "max_iters" not in solver_options and solver_options["solver"] == cp.SCS:
        solver_options["max_iters"] = 10000

    logger.debug("Removing possible duplicate values in lower bound array")
    b_lb = problem.utility_values
    A_lb, unique_indices = np.unique(problem.A_lb, return_index=True, axis=0)
    b_lb = b_lb[unique_indices]

    logger.debug("Building equality constraint")
    A_eq = np.ones((1, n_obs))
    # We might have already computed the total utility one or more times.
    # This is the index of the row(s) in A_lb with all ones.
    total_utility_indices = np.where(A_lb.sum(axis=1) == n_obs)[0]
    if len(total_utility_indices) == 0:
        b_eq = np.array([u(Sample(idx=None, subset=u.training_data.indices))])
    else:
        b_eq = b_lb[total_utility_indices]
        # Remove the row(s) corresponding to the total utility
        # from the lower bound constraints
        # because given the equality constraint
        # it is the same as using the constraint e >= 0
        # (i.e. setting non_negative_subsidy = True).
        mask: NDArray[np.bool_] = np.ones_like(b_lb, dtype=bool)
        mask[total_utility_indices] = False
        b_lb = b_lb[mask]
        A_lb = A_lb[mask]

    # Remove the row(s) corresponding to the empty subset
    # because, given u(∅) = (which is almost always the case,
    # it is the same as using the constraint e >= 0
    # (i.e. setting non_negative_subsidy = True).
    emptyset_utility_indices = np.where(A_lb.sum(axis=1) == 0)[0]
    if len(emptyset_utility_indices) > 0:
        mask = np.ones_like(b_lb, dtype=bool)
        mask[emptyset_utility_indices] = False
        b_lb = b_lb[mask]
        A_lb = A_lb[mask]

    _, subsidy = _solve_least_core_linear_program(
        A_eq=A_eq,
        b_eq=b_eq,
        A_lb=A_lb,
        b_lb=b_lb,
        non_negative_subsidy=non_negative_subsidy,
        solver_options=solver_options,
    )

    values: NDArray[np.float_] | None

    if subsidy is None:
        logger.debug("No values were found")
        status = Status.Failed
        values = np.empty(n_obs)
        values[:] = np.nan
        subsidy = np.nan
    else:
        values = _solve_egalitarian_least_core_quadratic_program(
            subsidy,
            A_eq=A_eq,
            b_eq=b_eq,
            A_lb=A_lb,
            b_lb=b_lb,
            solver_options=solver_options,
        )

        if values is None:
            logger.debug("No values were found")
            status = Status.Failed
            values = np.empty(n_obs)
            values[:] = np.nan
            subsidy = np.nan
        else:
            status = Status.Converged

    return ValuationResult(
        algorithm=algorithm,
        status=status,
        values=values,
        subsidy=subsidy,
        stderr=None,
        data_names=u.training_data.data_names,
    )