Skip to content

Ray

RayExecutor(max_workers=None, *, config=ParallelConfig(), cancel_futures=CancellationPolicy.ALL)

Bases: Executor

Asynchronous executor using Ray that implements the concurrent.futures API.

It shouldn't be initialized directly. You should instead call init_executor().

PARAMETER DESCRIPTION
max_workers

Maximum number of concurrent tasks. Each task can request itself any number of vCPUs. You must ensure the product of this value and the n_cpus_per_job parameter passed to submit() does not exceed available cluster resources. If set to None, it will default to the total number of vCPUs in the ray cluster.

TYPE: Optional[int] DEFAULT: None

config

instance of ParallelConfig with cluster address, number of cpus, etc.

TYPE: ParallelConfig DEFAULT: ParallelConfig()

cancel_futures

Select which futures will be cancelled when exiting this context manager. Pending is the default, which will cancel all pending futures, but not running ones, as done by concurrent.futures.ProcessPoolExecutor. Additionally, All cancels all pending and running futures, and None doesn't cancel any. See CancellationPolicy

TYPE: CancellationPolicy DEFAULT: ALL

Source code in src/pydvl/utils/parallel/futures/ray.py
@deprecated(
    target=True,
    deprecated_in="0.7.0",
    remove_in="0.8.0",
    args_mapping={"cancel_futures_on_exit": "cancel_futures"},
)
def __init__(
    self,
    max_workers: Optional[int] = None,
    *,
    config: ParallelConfig = ParallelConfig(),
    cancel_futures: CancellationPolicy = CancellationPolicy.ALL,
):
    if config.backend != "ray":
        raise ValueError(
            f"Parallel backend must be set to 'ray' and not '{config.backend}'"
        )
    if max_workers is not None:
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")
        max_workers = max_workers

    if isinstance(cancel_futures, CancellationPolicy):
        self._cancel_futures = cancel_futures
    else:
        self._cancel_futures = (
            CancellationPolicy.PENDING
            if cancel_futures
            else CancellationPolicy.NONE
        )

    self.config = {"address": config.address, "logging_level": config.logging_level}
    if config.address is None:
        self.config["num_cpus"] = config.n_cpus_local

    if not ray.is_initialized():
        ray.init(**self.config)

    self._max_workers = max_workers
    if self._max_workers is None:
        self._max_workers = int(ray._private.state.cluster_resources()["CPU"])

    self._shutdown = False
    self._shutdown_lock = threading.Lock()
    self._queue_lock = threading.Lock()
    self._work_queue: "queue.Queue[Optional[_WorkItem]]" = queue.Queue(
        maxsize=self._max_workers
    )
    self._pending_queue: "queue.SimpleQueue[Optional[_WorkItem]]" = (
        queue.SimpleQueue()
    )

    # Work Item Manager Thread
    self._work_item_manager_thread: Optional[_WorkItemManagerThread] = None

submit(fn, *args, **kwargs)

Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.

PARAMETER DESCRIPTION
fn

Callable.

TYPE: Callable[..., T]

args

Positional arguments that will be passed to fn.

DEFAULT: ()

kwargs

Keyword arguments that will be passed to fn. It can also optionally contain options for the ray remote function as a dictionary as the keyword argument remote_function_options.

DEFAULT: {}

Returns: A Future representing the given call.

RAISES DESCRIPTION
RuntimeError

If a task is submitted after the executor has been shut down.

Source code in src/pydvl/utils/parallel/futures/ray.py
def submit(self, fn: Callable[..., T], *args, **kwargs) -> "Future[T]":
    r"""Submits a callable to be executed with the given arguments.

    Schedules the callable to be executed as fn(\*args, \**kwargs)
    and returns a Future instance representing the execution of the callable.

    Args:
        fn: Callable.
        args: Positional arguments that will be passed to `fn`.
        kwargs: Keyword arguments that will be passed to `fn`.
            It can also optionally contain options for the ray remote function
            as a dictionary as the keyword argument `remote_function_options`.
    Returns:
        A Future representing the given call.

    Raises:
        RuntimeError: If a task is submitted after the executor has been shut down.
    """
    with self._shutdown_lock:
        logger.debug("executor acquired shutdown lock")
        if self._shutdown:
            raise RuntimeError("cannot schedule new futures after shutdown")

        logging.debug("Creating future and putting work item in work queue")
        future: "Future[T]" = Future()
        remote_function_options = kwargs.pop("remote_function_options", None)
        w = _WorkItem(
            future,
            fn,
            args,
            kwargs,
            remote_function_options=remote_function_options,
        )
        self._put_work_item_in_queue(w)
        # We delay starting the thread until the first call to submit
        self._start_work_item_manager_thread()
        return future

shutdown(wait=True, *, cancel_futures=None)

Clean up the resources associated with the Executor.

This method tries to mimic the behaviour of Executor.shutdown while allowing one more value for cancel_futures which instructs it to use the CancellationPolicy defined upon construction.

PARAMETER DESCRIPTION
wait

Whether to wait for pending futures to finish.

TYPE: bool DEFAULT: True

cancel_futures

Overrides the executor's default policy for cancelling futures on exit. If True, all pending futures are cancelled, and if False, no futures are cancelled. If None (default), the executor's policy set at initialization is used.

TYPE: Optional[bool] DEFAULT: None

Source code in src/pydvl/utils/parallel/futures/ray.py
def shutdown(
    self, wait: bool = True, *, cancel_futures: Optional[bool] = None
) -> None:
    """Clean up the resources associated with the Executor.

    This method tries to mimic the behaviour of
    [Executor.shutdown][concurrent.futures.Executor.shutdown]
    while allowing one more value for ``cancel_futures`` which instructs it
    to use the [CancellationPolicy][pydvl.utils.parallel.backend.CancellationPolicy]
    defined upon construction.

    Args:
        wait: Whether to wait for pending futures to finish.
        cancel_futures: Overrides the executor's default policy for
            cancelling futures on exit. If ``True``, all pending futures are
            cancelled, and if ``False``, no futures are cancelled. If ``None``
            (default), the executor's policy set at initialization is used.
    """
    logger.debug("executor shutting down")
    with self._shutdown_lock:
        logger.debug("executor acquired shutdown lock")
        self._shutdown = True
        self._cancel_futures = {
            None: self._cancel_futures,
            True: CancellationPolicy.PENDING,
            False: CancellationPolicy.NONE,
        }[cancel_futures]

    if wait:
        logger.debug("executor waiting for futures to finish")
        if self._work_item_manager_thread is not None:
            # Putting None in the queue to signal
            # to work item manager thread that we are shutting down
            self._put_work_item_in_queue(None)
            logger.debug(
                "executor waiting for work item manager thread to terminate"
            )
            self._work_item_manager_thread.join()
        # To reduce the risk of opening too many files, remove references to
        # objects that use file descriptors.
        self._work_item_manager_thread = None
        del self._work_queue
        del self._pending_queue

__exit__(exc_type, exc_val, exc_tb)

Exit the runtime context related to the RayExecutor object.

Source code in src/pydvl/utils/parallel/futures/ray.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Exit the runtime context related to the RayExecutor object."""
    self.shutdown()
    return False

Last update: 2023-09-02
Created: 2023-09-02