Skip to content

Map reduce

This module contains a wrapper around joblib's Parallel() class that makes it easy to run map-reduce jobs.

Deprecation

This interface might be deprecated or changed in a future release before 1.0

MapReduceJob(inputs, map_func, reduce_func=identity, map_kwargs=None, reduce_kwargs=None, config=ParallelConfig(), *, n_jobs=-1, timeout=None)

Bases: Generic[T, R]

Takes an embarrassingly parallel fun and runs it in n_jobs parallel jobs, splitting the data evenly into a number of chunks equal to the number of jobs.

Typing information for objects of this class requires the type of the inputs that are split for map_func and the type of its output.

PARAMETER DESCRIPTION
inputs

The input that will be split and passed to map_func. if it's not a sequence object. It will be repeat n_jobs number of times.

TYPE: Union[Collection[T], T]

map_func

Function that will be applied to the input chunks in each job.

TYPE: MapFunction[R]

reduce_func

Function that will be applied to the results of map_func to reduce them.

TYPE: ReduceFunction[R] DEFAULT: identity

map_kwargs

Keyword arguments that will be passed to map_func in each job. Alternatively, one can use functools.partial.

TYPE: Optional[Dict] DEFAULT: None

reduce_kwargs

Keyword arguments that will be passed to reduce_func in each job. Alternatively, one can use functools.partial.

TYPE: Optional[Dict] DEFAULT: None

config

Instance of ParallelConfig with cluster address, number of cpus, etc.

TYPE: ParallelConfig DEFAULT: ParallelConfig()

n_jobs

Number of parallel jobs to run. Does not accept 0

TYPE: int DEFAULT: -1

Example

A simple usage example with 2 jobs:

>>> from pydvl.parallel import MapReduceJob
>>> import numpy as np
>>> map_reduce_job: MapReduceJob[np.ndarray, np.ndarray] = MapReduceJob(
...     np.arange(5),
...     map_func=np.sum,
...     reduce_func=np.sum,
...     n_jobs=2,
... )
>>> map_reduce_job()
10

When passed a single object as input, it will be repeated for each job:

>>> from pydvl.parallel import MapReduceJob
>>> import numpy as np
>>> map_reduce_job: MapReduceJob[int, np.ndarray] = MapReduceJob(
...     5,
...     map_func=lambda x: np.array([x]),
...     reduce_func=np.sum,
...     n_jobs=2,
... )
>>> map_reduce_job()
10

Source code in src/pydvl/parallel/map_reduce.py
def __init__(
    self,
    inputs: Union[Collection[T], T],
    map_func: MapFunction[R],
    reduce_func: ReduceFunction[R] = identity,
    map_kwargs: Optional[Dict] = None,
    reduce_kwargs: Optional[Dict] = None,
    config: ParallelConfig = ParallelConfig(),
    *,
    n_jobs: int = -1,
    timeout: Optional[float] = None,
):
    self.config = config
    parallel_backend = init_parallel_backend(self.config)
    self.parallel_backend = parallel_backend

    self.timeout = timeout

    # This uses the setter defined below
    self.n_jobs = n_jobs

    self.inputs_ = inputs

    self.map_kwargs = map_kwargs if map_kwargs is not None else dict()
    self.reduce_kwargs = reduce_kwargs if reduce_kwargs is not None else dict()

    self._map_func = reduce(maybe_add_argument, ["job_id", "seed"], map_func)
    self._reduce_func = reduce_func

n_jobs: int property writable

Effective number of jobs according to the used ParallelBackend instance.

__call__(seed=None)

Runs the map-reduce job.

PARAMETER DESCRIPTION
seed

Either an instance of a numpy random number generator or a seed for it.

TYPE: Optional[Union[Seed, SeedSequence]] DEFAULT: None

RETURNS DESCRIPTION
R

The result of the reduce function.

Source code in src/pydvl/parallel/map_reduce.py
def __call__(
    self,
    seed: Optional[Union[Seed, SeedSequence]] = None,
) -> R:
    """
    Runs the map-reduce job.

    Args:
        seed: Either an instance of a numpy random number generator or a seed for
            it.

    Returns:
         The result of the reduce function.
    """
    parallel_kwargs: Dict[str, Any] = {"n_jobs": self.n_jobs}
    if self.config.backend == "joblib":
        parallel_kwargs["backend"] = "loky"
    else:
        parallel_kwargs["backend"] = self.config.backend
    # In joblib the levels are reversed.
    # 0 means no logging and 50 means log everything to stdout
    if self.config.logging_level is not None:
        parallel_kwargs["verbose"] = 50 - self.config.logging_level
    seed_seq = ensure_seed_sequence(seed)
    with Parallel(**parallel_kwargs) as parallel:
        chunks = self._chunkify(self.inputs_, n_chunks=self.n_jobs)
        map_results: List[R] = parallel(
            delayed(self._map_func)(
                next_chunk, job_id=j, seed=seed, **self.map_kwargs
            )
            for j, (next_chunk, seed) in enumerate(
                zip(chunks, seed_seq.spawn(len(chunks)))
            )
        )

    reduce_results: R = self._reduce_func(map_results, **self.reduce_kwargs)
    return reduce_results