Skip to content

pydvl.parallel

This module provides a common interface to parallelization backends. The list of supported backends is here. Backends should be instantiated directly and passed to the respective valuation method.

We use executors that implement the Executor interface to submit tasks in parallel. The basic high-level pattern is:

from pydvl.parallel import JoblibParallelBackend

parallel_backend = JoblibParallelBackend()
with parallel_backend.executor(max_workers=2) as executor:
    future = executor.submit(lambda x: x + 1, 1)
    result = future.result()
assert result == 2

Running a map-style job is also easy:

from pydvl.parallel import JoblibParallelBackend

parallel_backend = JoblibParallelBackend()
with parallel_backend.executor(max_workers=2) as executor:
    results = list(executor.map(lambda x: x + 1, range(5)))
assert results == [1, 2, 3, 4, 5]

Passsing large objects

When running tasks which accept heavy inputs, it is important to first use put() on the object and use the returned reference as argument to the callable within submit(). For example:

u_ref = parallel_backend.put(u)
...
executor.submit(task, utility=u)
Note that task() does not need to be changed in any way: the backend will get() the object and pass it to the function upon invocation.

There is an alternative map-reduce implementation MapReduceJob which internally uses joblib's higher level API with Parallel() which then indirectly also supports the use of Dask and Ray.

BaseModel

Bases: Protocol

This is the minimal model protocol with the method fit()

fit

fit(x: NDArray, y: NDArray | None)

Fit the model to the data

PARAMETER DESCRIPTION
x

Independent variables

TYPE: NDArray

y

Dependent variable

TYPE: NDArray | None

Source code in src/pydvl/utils/types.py
def fit(self, x: NDArray, y: NDArray | None):
    """Fit the model to the data

    Args:
        x: Independent variables
        y: Dependent variable
    """
    pass

SupervisedModel

Bases: Protocol

This is the standard sklearn Protocol with the methods fit(), predict() and score().

fit

fit(x: NDArray, y: NDArray | None)

Fit the model to the data

PARAMETER DESCRIPTION
x

Independent variables

TYPE: NDArray

y

Dependent variable

TYPE: NDArray | None

Source code in src/pydvl/utils/types.py
def fit(self, x: NDArray, y: NDArray | None):
    """Fit the model to the data

    Args:
        x: Independent variables
        y: Dependent variable
    """
    pass

predict

predict(x: NDArray) -> NDArray

Compute predictions for the input

PARAMETER DESCRIPTION
x

Independent variables for which to compute predictions

TYPE: NDArray

RETURNS DESCRIPTION
NDArray

Predictions for the input

Source code in src/pydvl/utils/types.py
def predict(self, x: NDArray) -> NDArray:
    """Compute predictions for the input

    Args:
        x: Independent variables for which to compute predictions

    Returns:
        Predictions for the input
    """
    pass

score

score(x: NDArray, y: NDArray | None) -> float

Compute the score of the model given test data

PARAMETER DESCRIPTION
x

Independent variables

TYPE: NDArray

y

Dependent variable

TYPE: NDArray | None

RETURNS DESCRIPTION
float

The score of the model on (x, y)

Source code in src/pydvl/utils/types.py
def score(self, x: NDArray, y: NDArray | None) -> float:
    """Compute the score of the model given test data

    Args:
        x: Independent variables
        y: Dependent variable

    Returns:
        The score of the model on `(x, y)`
    """
    pass

BaggingModel

Bases: Protocol

Any model with the attributes n_estimators and max_samples is considered a bagging model.

fit

fit(x: NDArray, y: NDArray | None)

Fit the model to the data

PARAMETER DESCRIPTION
x

Independent variables

TYPE: NDArray

y

Dependent variable

TYPE: NDArray | None

Source code in src/pydvl/utils/types.py
def fit(self, x: NDArray, y: NDArray | None):
    """Fit the model to the data

    Args:
        x: Independent variables
        y: Dependent variable
    """
    pass

predict

predict(x: NDArray) -> NDArray

Compute predictions for the input

PARAMETER DESCRIPTION
x

Independent variables for which to compute predictions

TYPE: NDArray

RETURNS DESCRIPTION
NDArray

Predictions for the input

Source code in src/pydvl/utils/types.py
def predict(self, x: NDArray) -> NDArray:
    """Compute predictions for the input

    Args:
        x: Independent variables for which to compute predictions

    Returns:
        Predictions for the input
    """
    pass

CancellationPolicy

Bases: Flag

Policy to use when cancelling futures after exiting an Executor.

Note

Not all backends support all policies.

ATTRIBUTE DESCRIPTION
NONE

Do not cancel any futures.

PENDING

Cancel all pending futures, but not running ones.

RUNNING

Cancel all running futures, but not pending ones.

ALL

Cancel all pending and running futures.

ParallelBackend

Abstract base class for all parallel backends.

executor abstractmethod classmethod

executor(
    max_workers: int | None = None,
    *,
    config: ParallelConfig | None = None,
    cancel_futures: CancellationPolicy | bool = PENDING,
) -> Executor

Returns a futures executor for the parallel backend.

Source code in src/pydvl/parallel/backend.py
@classmethod
@abstractmethod
def executor(
    cls,
    max_workers: int | None = None,
    *,
    config: ParallelConfig | None = None,
    cancel_futures: CancellationPolicy | bool = CancellationPolicy.PENDING,
) -> Executor:
    """Returns a futures executor for the parallel backend."""
    ...

JoblibParallelBackend

JoblibParallelBackend(config: ParallelConfig | None = None)

Bases: ParallelBackend

Class used to wrap joblib to make it transparent to algorithms.

Example

from pydvl.parallel import JoblibParallelBackend
parallel_backend = JoblibParallelBackend()
Source code in src/pydvl/parallel/backends/joblib.py
@deprecated(
    target=True,
    args_mapping={"config": None},
    deprecated_in="0.9.0",
    remove_in="0.10.0",
)
def __init__(self, config: ParallelConfig | None = None) -> None:
    n_jobs: int | None = None
    if config is not None:
        n_jobs = config.n_cpus_local
    self.config = {
        "n_jobs": n_jobs,
    }

executor classmethod

executor(
    max_workers: int | None = None,
    *,
    config: ParallelConfig | None = None,
    cancel_futures: CancellationPolicy | bool = NONE,
) -> Executor

Returns a futures executor for the parallel backend.

Example

from pydvl.parallel import JoblibParallelBackend
parallel_backend = JoblibParallelBackend()
with parallel_backend.executor() as executor:
    executor.submit(...)
PARAMETER DESCRIPTION
max_workers

Maximum number of parallel workers.

TYPE: int | None DEFAULT: None

config

(DEPRECATED) Object configuring parallel computation, with cluster address, number of cpus, etc.

TYPE: ParallelConfig | None DEFAULT: None

cancel_futures

Policy to use when cancelling futures after exiting an Executor.

TYPE: CancellationPolicy | bool DEFAULT: NONE

RETURNS DESCRIPTION
Executor

Instance of [_ReusablePoolExecutor][joblib.externals.loky.reusable_executor].

Source code in src/pydvl/parallel/backends/joblib.py
@classmethod
def executor(
    cls,
    max_workers: int | None = None,
    *,
    config: ParallelConfig | None = None,
    cancel_futures: CancellationPolicy | bool = CancellationPolicy.NONE,
) -> Executor:
    """Returns a futures executor for the parallel backend.

    !!! Example
        ``` python
        from pydvl.parallel import JoblibParallelBackend
        parallel_backend = JoblibParallelBackend()
        with parallel_backend.executor() as executor:
            executor.submit(...)
        ```

    Args:
        max_workers: Maximum number of parallel workers.
        config: (**DEPRECATED**) Object configuring parallel computation,
            with cluster address, number of cpus, etc.
        cancel_futures: Policy to use when cancelling futures
            after exiting an Executor.

    Returns:
        Instance of [_ReusablePoolExecutor][joblib.externals.loky.reusable_executor].
    """
    if config is not None:
        warnings.warn(
            "The `JoblibParallelBackend` uses deprecated arguments: "
            "`config`. They were deprecated since v0.9.0 "
            "and will be removed in v0.10.0.",
            FutureWarning,
        )

    if cancel_futures not in (CancellationPolicy.NONE, False):
        warnings.warn(
            "Cancellation of futures is not supported by the joblib backend",
        )
    return cast(Executor, get_reusable_executor(max_workers=max_workers))

wrap

wrap(fun: Callable, **kwargs) -> Callable

Wraps a function as a joblib delayed.

PARAMETER DESCRIPTION
fun

the function to wrap

TYPE: Callable

RETURNS DESCRIPTION
Callable

The delayed function.

Source code in src/pydvl/parallel/backends/joblib.py
def wrap(self, fun: Callable, **kwargs) -> Callable:
    """Wraps a function as a joblib delayed.

    Args:
        fun: the function to wrap

    Returns:
        The delayed function.
    """
    return delayed(fun)  # type: ignore

RayParallelBackend

RayParallelBackend(config: ParallelConfig | None = None)

Bases: ParallelBackend

Class used to wrap ray to make it transparent to algorithms.

Example

import ray
from pydvl.parallel import RayParallelBackend
ray.init()
parallel_backend = RayParallelBackend()
Source code in src/pydvl/parallel/backends/ray.py
@deprecated(
    target=True,
    args_mapping={"config": None},
    deprecated_in="0.9.0",
    remove_in="0.10.0",
)
def __init__(self, config: ParallelConfig | None = None) -> None:
    if not ray.is_initialized():
        raise RuntimeError(
            "Starting from v0.9.0, ray is no longer automatically initialized. "
            "Please use `ray.init()` with the desired configuration "
            "before using this class."
        )
    # Register ray joblib backend
    register_ray()

executor classmethod

executor(
    max_workers: int | None = None,
    *,
    config: ParallelConfig | None = None,
    cancel_futures: CancellationPolicy | bool = PENDING,
) -> Executor

Returns a futures executor for the parallel backend.

Example

import ray
from pydvl.parallel import RayParallelBackend
ray.init()
parallel_backend = RayParallelBackend()
with parallel_backend.executor() as executor:
    executor.submit(...)
PARAMETER DESCRIPTION
max_workers

Maximum number of parallel workers.

TYPE: int | None DEFAULT: None

config

(DEPRECATED) Object configuring parallel computation, with cluster address, number of cpus, etc.

TYPE: ParallelConfig | None DEFAULT: None

cancel_futures

Policy to use when cancelling futures after exiting an Executor.

TYPE: CancellationPolicy | bool DEFAULT: PENDING

RETURNS DESCRIPTION
Executor

Instance of RayExecutor.

Source code in src/pydvl/parallel/backends/ray.py
@classmethod
def executor(
    cls,
    max_workers: int | None = None,
    *,
    config: ParallelConfig | None = None,
    cancel_futures: CancellationPolicy | bool = CancellationPolicy.PENDING,
) -> Executor:
    """Returns a futures executor for the parallel backend.

    !!! Example
        ``` python
        import ray
        from pydvl.parallel import RayParallelBackend
        ray.init()
        parallel_backend = RayParallelBackend()
        with parallel_backend.executor() as executor:
            executor.submit(...)
        ```

    Args:
        max_workers: Maximum number of parallel workers.
        config: (**DEPRECATED**) Object configuring parallel computation,
            with cluster address, number of cpus, etc.
        cancel_futures: Policy to use when cancelling futures
            after exiting an Executor.

    Returns:
        Instance of [RayExecutor][pydvl.parallel.futures.ray.RayExecutor].
    """
    # Imported here to avoid circular import errors
    from pydvl.parallel.futures.ray import RayExecutor

    if config is not None:
        warnings.warn(
            "The `RayParallelBackend` uses deprecated arguments: "
            "`config`. They were deprecated since v0.9.0 "
            "and will be removed in v0.10.0.",
            FutureWarning,
        )

    return RayExecutor(max_workers, cancel_futures=cancel_futures)  # type: ignore

wrap

wrap(fun: Callable, **kwargs: dict[str, Any]) -> Callable

Wraps a function as a ray remote.

PARAMETER DESCRIPTION
fun

the function to wrap

TYPE: Callable

kwargs

keyword arguments to pass to @ray.remote

TYPE: dict[str, Any] DEFAULT: {}

RETURNS DESCRIPTION
Callable

The .remote method of the ray RemoteFunction.

Source code in src/pydvl/parallel/backends/ray.py
def wrap(self, fun: Callable, **kwargs: dict[str, Any]) -> Callable:
    """Wraps a function as a ray remote.

    Args:
        fun: the function to wrap
        kwargs: keyword arguments to pass to @ray.remote

    Returns:
        The `.remote` method of the ray `RemoteFunction`.
    """
    if len(kwargs) > 0:
        return ray.remote(**kwargs)(fun).remote  # type: ignore
    return ray.remote(fun).remote  # type: ignore

ParallelConfig dataclass

ParallelConfig(
    backend: Literal["joblib", "ray"] = "joblib",
    address: Optional[Union[str, Tuple[str, int]]] = None,
    n_cpus_local: Optional[int] = None,
    logging_level: Optional[int] = None,
    wait_timeout: float = 1.0,
)

Configuration for parallel computation backend.

PARAMETER DESCRIPTION
backend

Type of backend to use. Defaults to 'joblib'

TYPE: Literal['joblib', 'ray'] DEFAULT: 'joblib'

address

(DEPRECATED) Address of existing remote or local cluster to use.

TYPE: Optional[Union[str, Tuple[str, int]]] DEFAULT: None

n_cpus_local

(DEPRECATED) Number of CPUs to use when creating a local ray cluster. This has no effect when using an existing ray cluster.

TYPE: Optional[int] DEFAULT: None

logging_level

(DEPRECATED) Logging level for the parallel backend's worker.

TYPE: Optional[int] DEFAULT: None

wait_timeout

(DEPRECATED) Timeout in seconds for waiting on futures.

TYPE: float DEFAULT: 1.0

MapReduceJob

MapReduceJob(
    inputs: Union[Collection[T], T],
    map_func: MapFunction[R],
    reduce_func: ReduceFunction[R] = identity,
    parallel_backend: Optional[ParallelBackend] = None,
    config: Optional[ParallelConfig] = None,
    *,
    map_kwargs: Optional[Dict] = None,
    reduce_kwargs: Optional[Dict] = None,
    n_jobs: int = -1,
    timeout: Optional[float] = None,
)

Bases: Generic[T, R]

Takes an embarrassingly parallel fun and runs it in n_jobs parallel jobs, splitting the data evenly into a number of chunks equal to the number of jobs.

Typing information for objects of this class requires the type of the inputs that are split for map_func and the type of its output.

PARAMETER DESCRIPTION
inputs

The input that will be split and passed to map_func. if it's not a sequence object. It will be repeat n_jobs number of times.

TYPE: Union[Collection[T], T]

map_func

Function that will be applied to the input chunks in each job.

TYPE: MapFunction[R]

reduce_func

Function that will be applied to the results of map_func to reduce them.

TYPE: ReduceFunction[R] DEFAULT: identity

map_kwargs

Keyword arguments that will be passed to map_func in each job. Alternatively, one can use functools.partial.

TYPE: Optional[Dict] DEFAULT: None

reduce_kwargs

Keyword arguments that will be passed to reduce_func in each job. Alternatively, one can use functools.partial.

TYPE: Optional[Dict] DEFAULT: None

parallel_backend

Parallel backend instance to use for parallelizing computations. If None, use JoblibParallelBackend backend. See the Parallel Backends package for available options.

TYPE: Optional[ParallelBackend] DEFAULT: None

config

(DEPRECATED) Object configuring parallel computation, with cluster address, number of cpus, etc.

TYPE: Optional[ParallelConfig] DEFAULT: None

n_jobs

Number of parallel jobs to run. Does not accept 0

TYPE: int DEFAULT: -1

Example

A simple usage example with 2 jobs:

>>> from pydvl.parallel import MapReduceJob
>>> import numpy as np
>>> map_reduce_job: MapReduceJob[np.ndarray, np.ndarray] = MapReduceJob(
...     np.arange(5),
...     map_func=np.sum,
...     reduce_func=np.sum,
...     n_jobs=2,
... )
>>> map_reduce_job()
10

When passed a single object as input, it will be repeated for each job:

>>> from pydvl.parallel import MapReduceJob
>>> import numpy as np
>>> map_reduce_job: MapReduceJob[int, np.ndarray] = MapReduceJob(
...     5,
...     map_func=lambda x: np.array([x]),
...     reduce_func=np.sum,
...     n_jobs=2,
... )
>>> map_reduce_job()
10

Source code in src/pydvl/parallel/map_reduce.py
@deprecated(
    target=True,
    args_mapping={"config": "config"},
    deprecated_in="0.9.0",
    remove_in="0.10.0",
)
def __init__(
    self,
    inputs: Union[Collection[T], T],
    map_func: MapFunction[R],
    reduce_func: ReduceFunction[R] = identity,
    parallel_backend: Optional[ParallelBackend] = None,
    config: Optional[ParallelConfig] = None,
    *,
    map_kwargs: Optional[Dict] = None,
    reduce_kwargs: Optional[Dict] = None,
    n_jobs: int = -1,
    timeout: Optional[float] = None,
):
    parallel_backend = _maybe_init_parallel_backend(parallel_backend, config)

    self.parallel_backend = parallel_backend

    self.timeout = timeout

    self._n_jobs = -1
    # This uses the setter defined below
    self.n_jobs = n_jobs

    self.inputs_ = inputs

    self.map_kwargs = map_kwargs if map_kwargs is not None else dict()
    self.reduce_kwargs = reduce_kwargs if reduce_kwargs is not None else dict()

    self._map_func = reduce(maybe_add_argument, ["job_id", "seed"], map_func)
    self._reduce_func = reduce_func

n_jobs property writable

n_jobs: int

Effective number of jobs according to the used ParallelBackend instance.

__call__

__call__(seed: Optional[Union[Seed, SeedSequence]] = None) -> R

Runs the map-reduce job.

PARAMETER DESCRIPTION
seed

Either an instance of a numpy random number generator or a seed for it.

TYPE: Optional[Union[Seed, SeedSequence]] DEFAULT: None

RETURNS DESCRIPTION
R

The result of the reduce function.

Source code in src/pydvl/parallel/map_reduce.py
def __call__(
    self,
    seed: Optional[Union[Seed, SeedSequence]] = None,
) -> R:
    """
    Runs the map-reduce job.

    Args:
        seed: Either an instance of a numpy random number generator or a seed for
            it.

    Returns:
         The result of the reduce function.
    """
    seed_seq = ensure_seed_sequence(seed)

    if hasattr(self.parallel_backend, "_joblib_backend_name"):
        backend = getattr(self.parallel_backend, "_joblib_backend_name")
    else:
        warnings.warn(
            "Parallel backend "
            f"{self.parallel_backend.__class__.__name__}. "
            "should have a `_joblib_backend_name` attribute in order to work "
            "property with MapReduceJob. "
            "Defaulting to joblib loky backend"
        )
        backend = "loky"

    with Parallel(backend=backend, prefer="processes") as parallel:
        chunks = self._chunkify(self.inputs_, n_chunks=self.n_jobs)
        map_results: List[R] = parallel(
            delayed(self._map_func)(
                next_chunk, job_id=j, seed=seed, **self.map_kwargs
            )
            for j, (next_chunk, seed) in enumerate(
                zip(chunks, seed_seq.spawn(len(chunks)))
            )
        )

    reduce_results: R = self._reduce_func(map_results, **self.reduce_kwargs)
    return reduce_results

ensure_seed_sequence

ensure_seed_sequence(
    seed: Optional[Union[Seed, SeedSequence]] = None,
) -> SeedSequence

If the passed seed is a SeedSequence object then it is returned as is. If it is a Generator the internal protected seed sequence from the generator gets extracted. Otherwise, a new SeedSequence object is created from the passed (optional) seed.

PARAMETER DESCRIPTION
seed

Either an int, a Generator object a SeedSequence object or None.

TYPE: Optional[Union[Seed, SeedSequence]] DEFAULT: None

RETURNS DESCRIPTION
SeedSequence

A SeedSequence object.

New in version 0.7.0

Source code in src/pydvl/utils/types.py
def ensure_seed_sequence(
    seed: Optional[Union[Seed, SeedSequence]] = None,
) -> SeedSequence:
    """
    If the passed seed is a SeedSequence object then it is returned as is. If it is
    a Generator the internal protected seed sequence from the generator gets extracted.
    Otherwise, a new SeedSequence object is created from the passed (optional) seed.

    Args:
        seed: Either an int, a Generator object a SeedSequence object or None.

    Returns:
        A SeedSequence object.

    !!! tip "New in version 0.7.0"
    """
    if isinstance(seed, SeedSequence):
        return seed
    elif isinstance(seed, Generator):
        return cast(SeedSequence, seed.bit_generator.seed_seq)  # type: ignore
    else:
        return SeedSequence(seed)

init_parallel_backend

init_parallel_backend(
    config: ParallelConfig | None = None, backend_name: str | None = None
) -> ParallelBackend

Initializes the parallel backend and returns an instance of it.

The following example creates a parallel backend instance with the default configuration, which is a local joblib backend.

If you don't pass any arguments, then by default it will instantiate the JoblibParallelBackend:

Example
parallel_backend = init_parallel_backend()

To create a parallel backend instance with for example ray as a backend, you can pass the backend name as a string:.

Example
parallel_backend = init_parallel_backend(backend_name="ray")

The following is an example of the deprecated way for instantiating a parallel backend:

Example
config = ParallelConfig()
parallel_backend = init_parallel_backend(config)
PARAMETER DESCRIPTION
backend_name

Name of the backend to instantiate.

TYPE: str | None DEFAULT: None

config

(DEPRECATED) Object configuring parallel computation, with cluster address, number of cpus, etc.

TYPE: ParallelConfig | None DEFAULT: None

Source code in src/pydvl/parallel/backend.py
@deprecated(
    target=True,
    args_mapping={"config": "config"},
    deprecated_in="0.9.0",
    remove_in="0.10.0",
)
def init_parallel_backend(
    config: ParallelConfig | None = None, backend_name: str | None = None
) -> ParallelBackend:
    """Initializes the parallel backend and returns an instance of it.

    The following example creates a parallel backend instance with the default
    configuration, which is a local joblib backend.

    If you don't pass any arguments, then by default it will instantiate
    the JoblibParallelBackend:

    ??? Example
        ```python
        parallel_backend = init_parallel_backend()
        ```

    To create a parallel backend instance with for example `ray` as a backend,
    you can pass the backend name as a string:.

    ??? Example
        ```python
        parallel_backend = init_parallel_backend(backend_name="ray")
        ```


    The following is an example of the deprecated
    way for instantiating a parallel backend:

    ??? Example
        ``` python
        config = ParallelConfig()
        parallel_backend = init_parallel_backend(config)
        ```

    Args:
        backend_name: Name of the backend to instantiate.
        config: (**DEPRECATED**) Object configuring parallel computation,
            with cluster address, number of cpus, etc.


    """
    if backend_name is None:
        if config is None:
            backend_name = "joblib"
        else:
            backend_name = config.backend

    try:
        parallel_backend_cls = ParallelBackend.BACKENDS[backend_name]
    except KeyError:
        raise NotImplementedError(f"Unexpected parallel backend {backend_name}")
    return parallel_backend_cls(config)  # type: ignore

available_cpus

available_cpus() -> int

Platform-independent count of available cores.

FIXME: do we really need this or is os.cpu_count enough? Is this portable?

RETURNS DESCRIPTION
int

Number of cores, or 1 if it is not possible to determine.

Source code in src/pydvl/parallel/backend.py
def available_cpus() -> int:
    """Platform-independent count of available cores.

    FIXME: do we really need this or is `os.cpu_count` enough? Is this portable?

    Returns:
        Number of cores, or 1 if it is not possible to determine.
    """
    from platform import system

    if system() != "Linux":
        return os.cpu_count() or 1
    return len(os.sched_getaffinity(0))  # type: ignore

init_executor

init_executor(
    max_workers: int | None = None,
    config: ParallelConfig | None = None,
    **kwargs: Any,
) -> Generator[Executor, None, None]

Initializes a futures executor for the given parallel configuration.

PARAMETER DESCRIPTION
max_workers

Maximum number of concurrent tasks.

TYPE: int | None DEFAULT: None

config

instance of ParallelConfig with cluster address, number of cpus, etc.

TYPE: ParallelConfig | None DEFAULT: None

kwargs

Other optional parameter that will be passed to the executor.

TYPE: Any DEFAULT: {}

Examples

from pydvl.parallel.futures import init_executor, ParallelConfig

config = ParallelConfig(backend="ray")
with init_executor(max_workers=1, config=config) as executor:
    future = executor.submit(lambda x: x + 1, 1)
    result = future.result()
assert result == 2
from pydvl.parallel.futures import init_executor
with init_executor() as executor:
    results = list(executor.map(lambda x: x + 1, range(5)))
assert results == [1, 2, 3, 4, 5]

Source code in src/pydvl/parallel/futures/__init__.py
@contextmanager
@deprecated(
    target=None,
    deprecated_in="0.9.0",
    remove_in="0.10.0",
)
def init_executor(
    max_workers: int | None = None,
    config: ParallelConfig | None = None,
    **kwargs: Any,
) -> Generator[Executor, None, None]:
    """Initializes a futures executor for the given parallel configuration.

    Args:
        max_workers: Maximum number of concurrent tasks.
        config: instance of [ParallelConfig][pydvl.utils.config.ParallelConfig]
            with cluster address, number of cpus, etc.
        kwargs: Other optional parameter that will be passed to the executor.


    ??? Examples
        ``` python
        from pydvl.parallel.futures import init_executor, ParallelConfig

        config = ParallelConfig(backend="ray")
        with init_executor(max_workers=1, config=config) as executor:
            future = executor.submit(lambda x: x + 1, 1)
            result = future.result()
        assert result == 2
        ```
        ``` python
        from pydvl.parallel.futures import init_executor
        with init_executor() as executor:
            results = list(executor.map(lambda x: x + 1, range(5)))
        assert results == [1, 2, 3, 4, 5]
        ```
    """
    if config is None:
        config = ParallelConfig()
    try:
        cls = ParallelBackend.BACKENDS[config.backend]
        with cls.executor(max_workers=max_workers, config=config, **kwargs) as e:
            yield e
    except KeyError:
        raise NotImplementedError(f"Unexpected parallel backend {config.backend}")