pydvl.parallel
¶
This module provides a common interface to parallelization backends. The list of supported backends is here. Backends should be instantiated directly and passed to the respective valuation method.
We use executors that implement the Executor interface to submit tasks in parallel. The basic high-level pattern is:
from pydvl.parallel import JoblibParallelBackend
parallel_backend = JoblibParallelBackend()
with parallel_backend.executor(max_workers=2) as executor:
future = executor.submit(lambda x: x + 1, 1)
result = future.result()
assert result == 2
Running a map-style job is also easy:
from pydvl.parallel import JoblibParallelBackend
parallel_backend = JoblibParallelBackend()
with parallel_backend.executor(max_workers=2) as executor:
results = list(executor.map(lambda x: x + 1, range(5)))
assert results == [1, 2, 3, 4, 5]
Passsing large objects
When running tasks which accept heavy inputs, it is important
to first use put()
on the object and use the returned reference
as argument to the callable within submit()
. For example:
task()
does not need to be changed in any way:
the backend will get()
the object and pass it to the function
upon invocation.
There is an alternative map-reduce implementation
MapReduceJob which internally
uses joblib's higher level API with Parallel()
which then indirectly also
supports the use of Dask and Ray.
BaseModel
¶
SupervisedModel
¶
Bases: Protocol
This is the standard sklearn Protocol with the methods fit()
, predict()
and
score()
.
fit
¶
predict
¶
BaggingModel
¶
Bases: Protocol
Any model with the attributes n_estimators
and max_samples
is considered a
bagging model.
fit
¶
CancellationPolicy
¶
Bases: Flag
Policy to use when cancelling futures after exiting an Executor.
Note
Not all backends support all policies.
ATTRIBUTE | DESCRIPTION |
---|---|
NONE |
Do not cancel any futures.
|
PENDING |
Cancel all pending futures, but not running ones.
|
RUNNING |
Cancel all running futures, but not pending ones.
|
ALL |
Cancel all pending and running futures.
|
ParallelBackend
¶
Abstract base class for all parallel backends.
executor
abstractmethod
classmethod
¶
executor(
max_workers: int | None = None,
*,
config: ParallelConfig | None = None,
cancel_futures: CancellationPolicy | bool = PENDING,
) -> Executor
Returns a futures executor for the parallel backend.
Source code in src/pydvl/parallel/backend.py
JoblibParallelBackend
¶
JoblibParallelBackend(config: ParallelConfig | None = None)
Bases: ParallelBackend
Class used to wrap joblib to make it transparent to algorithms.
Source code in src/pydvl/parallel/backends/joblib.py
executor
classmethod
¶
executor(
max_workers: int | None = None,
*,
config: ParallelConfig | None = None,
cancel_futures: CancellationPolicy | bool = NONE,
) -> Executor
Returns a futures executor for the parallel backend.
Example
PARAMETER | DESCRIPTION |
---|---|
max_workers
|
Maximum number of parallel workers.
TYPE:
|
config
|
(DEPRECATED) Object configuring parallel computation, with cluster address, number of cpus, etc.
TYPE:
|
cancel_futures
|
Policy to use when cancelling futures after exiting an Executor.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Executor
|
Instance of [_ReusablePoolExecutor][joblib.externals.loky.reusable_executor]. |
Source code in src/pydvl/parallel/backends/joblib.py
RayParallelBackend
¶
RayParallelBackend(config: ParallelConfig | None = None)
Bases: ParallelBackend
Class used to wrap ray to make it transparent to algorithms.
Example
Source code in src/pydvl/parallel/backends/ray.py
executor
classmethod
¶
executor(
max_workers: int | None = None,
*,
config: ParallelConfig | None = None,
cancel_futures: CancellationPolicy | bool = PENDING,
) -> Executor
Returns a futures executor for the parallel backend.
Example
PARAMETER | DESCRIPTION |
---|---|
max_workers
|
Maximum number of parallel workers.
TYPE:
|
config
|
(DEPRECATED) Object configuring parallel computation, with cluster address, number of cpus, etc.
TYPE:
|
cancel_futures
|
Policy to use when cancelling futures after exiting an Executor.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Executor
|
Instance of RayExecutor. |
Source code in src/pydvl/parallel/backends/ray.py
wrap
¶
Wraps a function as a ray remote.
PARAMETER | DESCRIPTION |
---|---|
fun
|
the function to wrap
TYPE:
|
kwargs
|
keyword arguments to pass to @ray.remote |
RETURNS | DESCRIPTION |
---|---|
Callable
|
The |
Source code in src/pydvl/parallel/backends/ray.py
ParallelConfig
dataclass
¶
ParallelConfig(
backend: Literal["joblib", "ray"] = "joblib",
address: Optional[Union[str, Tuple[str, int]]] = None,
n_cpus_local: Optional[int] = None,
logging_level: Optional[int] = None,
wait_timeout: float = 1.0,
)
Configuration for parallel computation backend.
PARAMETER | DESCRIPTION |
---|---|
backend
|
Type of backend to use. Defaults to 'joblib'
TYPE:
|
address
|
(DEPRECATED) Address of existing remote or local cluster to use. |
n_cpus_local
|
(DEPRECATED) Number of CPUs to use when creating a local ray cluster. This has no effect when using an existing ray cluster. |
logging_level
|
(DEPRECATED) Logging level for the parallel backend's worker. |
wait_timeout
|
(DEPRECATED) Timeout in seconds for waiting on futures.
TYPE:
|
MapReduceJob
¶
MapReduceJob(
inputs: Union[Collection[T], T],
map_func: MapFunction[R],
reduce_func: ReduceFunction[R] = identity,
parallel_backend: Optional[ParallelBackend] = None,
config: Optional[ParallelConfig] = None,
*,
map_kwargs: Optional[Dict] = None,
reduce_kwargs: Optional[Dict] = None,
n_jobs: int = -1,
timeout: Optional[float] = None,
)
Bases: Generic[T, R]
Takes an embarrassingly parallel fun and runs it in n_jobs
parallel
jobs, splitting the data evenly into a number of chunks equal to the number of jobs.
Typing information for objects of this class requires the type of the inputs
that are split for map_func
and the type of its output.
PARAMETER | DESCRIPTION |
---|---|
inputs
|
The input that will be split and passed to
TYPE:
|
map_func
|
Function that will be applied to the input chunks in each job.
TYPE:
|
reduce_func
|
Function that will be applied to the results of
TYPE:
|
map_kwargs
|
Keyword arguments that will be passed to |
reduce_kwargs
|
Keyword arguments that will be passed to |
parallel_backend
|
Parallel backend instance to use
for parallelizing computations. If
TYPE:
|
config
|
(DEPRECATED) Object configuring parallel computation, with cluster address, number of cpus, etc.
TYPE:
|
n_jobs
|
Number of parallel jobs to run. Does not accept 0
TYPE:
|
Example
A simple usage example with 2 jobs:
>>> from pydvl.parallel import MapReduceJob
>>> import numpy as np
>>> map_reduce_job: MapReduceJob[np.ndarray, np.ndarray] = MapReduceJob(
... np.arange(5),
... map_func=np.sum,
... reduce_func=np.sum,
... n_jobs=2,
... )
>>> map_reduce_job()
10
When passed a single object as input, it will be repeated for each job:
Source code in src/pydvl/parallel/map_reduce.py
n_jobs
property
writable
¶
n_jobs: int
Effective number of jobs according to the used ParallelBackend instance.
__call__
¶
__call__(seed: Optional[Union[Seed, SeedSequence]] = None) -> R
Runs the map-reduce job.
PARAMETER | DESCRIPTION |
---|---|
seed
|
Either an instance of a numpy random number generator or a seed for it.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
R
|
The result of the reduce function. |
Source code in src/pydvl/parallel/map_reduce.py
ensure_seed_sequence
¶
ensure_seed_sequence(
seed: Optional[Union[Seed, SeedSequence]] = None,
) -> SeedSequence
If the passed seed is a SeedSequence object then it is returned as is. If it is a Generator the internal protected seed sequence from the generator gets extracted. Otherwise, a new SeedSequence object is created from the passed (optional) seed.
PARAMETER | DESCRIPTION |
---|---|
seed
|
Either an int, a Generator object a SeedSequence object or None.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
SeedSequence
|
A SeedSequence object. |
New in version 0.7.0
Source code in src/pydvl/utils/types.py
init_parallel_backend
¶
init_parallel_backend(
config: ParallelConfig | None = None, backend_name: str | None = None
) -> ParallelBackend
Initializes the parallel backend and returns an instance of it.
The following example creates a parallel backend instance with the default configuration, which is a local joblib backend.
If you don't pass any arguments, then by default it will instantiate the JoblibParallelBackend:
To create a parallel backend instance with for example ray
as a backend,
you can pass the backend name as a string:.
The following is an example of the deprecated way for instantiating a parallel backend:
PARAMETER | DESCRIPTION |
---|---|
backend_name
|
Name of the backend to instantiate.
TYPE:
|
config
|
(DEPRECATED) Object configuring parallel computation, with cluster address, number of cpus, etc.
TYPE:
|
Source code in src/pydvl/parallel/backend.py
available_cpus
¶
available_cpus() -> int
Platform-independent count of available cores.
FIXME: do we really need this or is os.cpu_count
enough? Is this portable?
RETURNS | DESCRIPTION |
---|---|
int
|
Number of cores, or 1 if it is not possible to determine. |
Source code in src/pydvl/parallel/backend.py
init_executor
¶
init_executor(
max_workers: int | None = None,
config: ParallelConfig | None = None,
**kwargs: Any,
) -> Generator[Executor, None, None]
Initializes a futures executor for the given parallel configuration.
PARAMETER | DESCRIPTION |
---|---|
max_workers
|
Maximum number of concurrent tasks.
TYPE:
|
config
|
instance of ParallelConfig with cluster address, number of cpus, etc.
TYPE:
|
kwargs
|
Other optional parameter that will be passed to the executor.
TYPE:
|