Skip to content

Common

lc_solve_problem(problem, *, u, algorithm, non_negative_subsidy=False, solver_options=None, **options)

Solves a linear problem as prepared by mclc_prepare_problem(). Useful for parallel execution of multiple experiments by running this as a remote task.

See exact_least_core() or montecarlo_least_core() for argument descriptions.

Source code in src/pydvl/value/least_core/common.py
def lc_solve_problem(
    problem: LeastCoreProblem,
    *,
    u: Utility,
    algorithm: str,
    non_negative_subsidy: bool = False,
    solver_options: Optional[dict] = None,
    **options,
) -> ValuationResult:
    """Solves a linear problem as prepared by
    [mclc_prepare_problem()][pydvl.value.least_core.montecarlo.mclc_prepare_problem].
    Useful for parallel execution of multiple experiments by running this as a
    remote task.

    See [exact_least_core()][pydvl.value.least_core.naive.exact_least_core] or
    [montecarlo_least_core()][pydvl.value.least_core.montecarlo.montecarlo_least_core] for
    argument descriptions.
    """
    n = len(u.data)

    if np.any(np.isnan(problem.utility_values)):
        warnings.warn(
            f"Calculation returned "
            f"{np.sum(np.isnan(problem.utility_values))} NaN "
            f"values out of {problem.utility_values.size}",
            RuntimeWarning,
        )

    # TODO: remove this before releasing version 0.7.0
    if options:
        warnings.warn(
            DeprecationWarning(
                "Passing solver options as kwargs was deprecated in "
                "0.6.0, will be removed in 0.7.0. `Use solver_options` "
                "instead."
            )
        )
        if solver_options is None:
            solver_options = options
        else:
            solver_options.update(options)

    if solver_options is None:
        solver_options = {}

    if "solver" not in solver_options:
        solver_options["solver"] = cp.SCS

    if "max_iters" not in solver_options and solver_options["solver"] == cp.SCS:
        solver_options["max_iters"] = 10000

    logger.debug("Removing possible duplicate values in lower bound array")
    b_lb = problem.utility_values
    A_lb, unique_indices = np.unique(problem.A_lb, return_index=True, axis=0)
    b_lb = b_lb[unique_indices]

    logger.debug("Building equality constraint")
    A_eq = np.ones((1, n))
    # We might have already computed the total utility one or more times.
    # This is the index of the row(s) in A_lb with all ones.
    total_utility_indices = np.where(A_lb.sum(axis=1) == n)[0]
    if len(total_utility_indices) == 0:
        b_eq = np.array([u(u.data.indices)])
    else:
        b_eq = b_lb[total_utility_indices]
        # Remove the row(s) corresponding to the total utility
        # from the lower bound constraints
        # because given the equality constraint
        # it is the same as using the constraint e >= 0
        # (i.e. setting non_negative_subsidy = True).
        mask: NDArray[np.bool_] = np.ones_like(b_lb, dtype=bool)
        mask[total_utility_indices] = False
        b_lb = b_lb[mask]
        A_lb = A_lb[mask]

    # Remove the row(s) corresponding to the empty subset
    # because, given u(∅) = (which is almost always the case,
    # it is the same as using the constraint e >= 0
    # (i.e. setting non_negative_subsidy = True).
    emptyset_utility_indices = np.where(A_lb.sum(axis=1) == 0)[0]
    if len(emptyset_utility_indices) > 0:
        mask = np.ones_like(b_lb, dtype=bool)
        mask[emptyset_utility_indices] = False
        b_lb = b_lb[mask]
        A_lb = A_lb[mask]

    _, subsidy = _solve_least_core_linear_program(
        A_eq=A_eq,
        b_eq=b_eq,
        A_lb=A_lb,
        b_lb=b_lb,
        non_negative_subsidy=non_negative_subsidy,
        solver_options=solver_options,
    )

    values: Optional[NDArray[np.float_]]

    if subsidy is None:
        logger.debug("No values were found")
        status = Status.Failed
        values = np.empty(n)
        values[:] = np.nan
        subsidy = np.nan
    else:
        values = _solve_egalitarian_least_core_quadratic_program(
            subsidy,
            A_eq=A_eq,
            b_eq=b_eq,
            A_lb=A_lb,
            b_lb=b_lb,
            solver_options=solver_options,
        )

        if values is None:
            logger.debug("No values were found")
            status = Status.Failed
            values = np.empty(n)
            values[:] = np.nan
            subsidy = np.nan
        else:
            status = Status.Converged

    return ValuationResult(
        algorithm=algorithm,
        status=status,
        values=values,
        subsidy=subsidy,
        stderr=None,
        data_names=u.data.data_names,
    )

lc_solve_problems(problems, u, algorithm, config=ParallelConfig(), n_jobs=1, non_negative_subsidy=True, solver_options=None, **options)

Solves a list of linear problems in parallel.

PARAMETER DESCRIPTION
u

Utility.

TYPE: Utility

problems

Least Core problems to solve, as returned by mclc_prepare_problem().

TYPE: Sequence[LeastCoreProblem]

algorithm

Name of the valuation algorithm.

TYPE: str

config

Object configuring parallel computation, with cluster address, number of cpus, etc.

TYPE: ParallelConfig DEFAULT: ParallelConfig()

n_jobs

Number of parallel jobs to run.

TYPE: int DEFAULT: 1

non_negative_subsidy

If True, the least core subsidy \(e\) is constrained to be non-negative.

TYPE: bool DEFAULT: True

solver_options

Additional options to pass to the solver.

TYPE: Optional[dict] DEFAULT: None

RETURNS DESCRIPTION
List[ValuationResult]

List of solutions.

Source code in src/pydvl/value/least_core/common.py
def lc_solve_problems(
    problems: Sequence[LeastCoreProblem],
    u: Utility,
    algorithm: str,
    config: ParallelConfig = ParallelConfig(),
    n_jobs: int = 1,
    non_negative_subsidy: bool = True,
    solver_options: Optional[dict] = None,
    **options,
) -> List[ValuationResult]:
    """Solves a list of linear problems in parallel.

    Args:
        u: Utility.
        problems: Least Core problems to solve, as returned by
            [mclc_prepare_problem()][pydvl.value.least_core.montecarlo.mclc_prepare_problem].
        algorithm: Name of the valuation algorithm.
        config: Object configuring parallel computation, with cluster address,
            number of cpus, etc.
        n_jobs: Number of parallel jobs to run.
        non_negative_subsidy: If True, the least core subsidy $e$ is constrained
            to be non-negative.
        solver_options: Additional options to pass to the solver.

    Returns:
        List of solutions.
    """

    def _map_func(
        problems: List[LeastCoreProblem], *args, **kwargs
    ) -> List[ValuationResult]:
        return [lc_solve_problem(p, *args, **kwargs) for p in problems]

    map_reduce_job: MapReduceJob[
        "LeastCoreProblem", "List[ValuationResult]"
    ] = MapReduceJob(
        inputs=problems,
        map_func=_map_func,
        map_kwargs=dict(
            u=u,
            algorithm=algorithm,
            non_negative_subsidy=non_negative_subsidy,
            solver_options=solver_options,
            **options,
        ),
        reduce_func=lambda x: list(itertools.chain(*x)),
        config=config,
        n_jobs=n_jobs,
    )
    solutions = map_reduce_job()

    return solutions

Last update: 2023-10-14
Created: 2023-10-14