Skip to content

pydvl.utils.caching

This module provides caching of functions.

PyDVL can cache (memoize) the computation of the utility function and speed up some computations for data valuation.

Warning

Function evaluations are cached with a key based on the function's signature and code. This can lead to undesired cache hits, see Cache reuse.

Remember not to reuse utility objects for different datasets.

Configuration

Caching is disabled by default but can be enabled easily, see Setting up the cache. When enabled, it will be added to any callable used to construct a Utility (done with the wrap method of CacheBackend). Depending on the nature of the utility you might want to enable the computation of a running average of function values, see Usage with stochastic functions. You can see all configuration options under CachedFuncConfig.

Supported Backends

pyDVL supports 3 different caching backends:

  • InMemoryCacheBackend: an in-memory cache backend that uses a dictionary to store and retrieve cached values. This is used to share cached values between threads in a single process.
  • DiskCacheBackend: a disk-based cache backend that uses pickled values written to and read from disk. This is used to share cached values between processes in a single machine.
  • MemcachedCacheBackend: a Memcached-based cache backend that uses pickled values written to and read from a Memcached server. This is used to share cached values between processes across multiple machines.

    Info

    This specific backend requires optional dependencies not installed by default. See Extra dependencies for more information.

Usage with stochastic functions

In addition to standard memoization, the wrapped functions can compute running average and standard error of repeated evaluations for the same input. This can be useful for stochastic functions with high variance (e.g. model training for small sample sizes), but drastically reduces the speed benefits of memoization.

This behaviour can be activated with the option allow_repeated_evaluations.

Cache reuse

When working directly with CachedFunc, it is essential to only cache pure functions. If they have any kind of state, either internal or external (e.g. a closure over some data that may change), then the cache will fail to notice this and the same value will be returned.

When a function is wrapped with CachedFunc for memoization, its signature (input and output names) and code are used as a key for the cache.

If you are running experiments with the same Utility but different datasets, this will lead to evaluations of the utility on new data returning old values because utilities only use sample indices as arguments (so there is no way to tell the difference between '1' for dataset A and '1' for dataset 2 from the point of view of the cache). One solution is to empty the cache between runs by calling the clear method of the cache backend instance, but the preferred one is to use a different Utility object for each dataset.

Unexpected cache misses

Because all arguments to a function are used as part of the key for the cache, sometimes one must exclude some of them. For example, If a function is going to run across multiple processes and some reporting arguments are added (like a job_id for logging purposes), these will be part of the signature and make the functions distinct to the eyes of the cache. This can be avoided with the use of ignore_args option in the configuration.

CacheStats dataclass

CacheStats(
    sets: int = 0,
    misses: int = 0,
    hits: int = 0,
    timeouts: int = 0,
    errors: int = 0,
    reconnects: int = 0,
)

Class used to store statistics gathered by cached functions.

ATTRIBUTE DESCRIPTION
sets

Number of times a value was set in the cache.

TYPE: int

misses

Number of times a value was not found in the cache.

TYPE: int

hits

Number of times a value was found in the cache.

TYPE: int

timeouts

Number of times a timeout occurred.

TYPE: int

errors

Number of times an error occurred.

TYPE: int

reconnects

Number of times the client reconnected to the server.

TYPE: int

CacheBackend

CacheBackend()

Bases: ABC

Abstract base class for cache backends.

Defines interface for cache access including wrapping callables, getting/setting results, clearing cache, and combining cache keys.

ATTRIBUTE DESCRIPTION
stats

Cache statistics tracker.

Source code in src/pydvl/utils/caching/base.py
def __init__(self) -> None:
    self.stats = CacheStats()

wrap

wrap(
    func: Callable, *, config: Optional[CachedFuncConfig] = None
) -> CachedFunc

Wraps a function to cache its results.

PARAMETER DESCRIPTION
func

The function to wrap.

TYPE: Callable

config

Optional caching options for the wrapped function.

TYPE: Optional[CachedFuncConfig] DEFAULT: None

RETURNS DESCRIPTION
CachedFunc

The wrapped cached function.

Source code in src/pydvl/utils/caching/base.py
def wrap(
    self,
    func: Callable,
    *,
    config: Optional[CachedFuncConfig] = None,
) -> "CachedFunc":
    """Wraps a function to cache its results.

    Args:
        func: The function to wrap.
        config: Optional caching options for the wrapped function.

    Returns:
        The wrapped cached function.
    """
    return CachedFunc(
        func,
        cache_backend=self,
        config=config,
    )

get abstractmethod

get(key: str) -> Optional[CacheResult]

Abstract method to retrieve a cached result.

Implemented by subclasses.

PARAMETER DESCRIPTION
key

The cache key.

TYPE: str

RETURNS DESCRIPTION
Optional[CacheResult]

The cached result or None if not found.

Source code in src/pydvl/utils/caching/base.py
@abstractmethod
def get(self, key: str) -> Optional[CacheResult]:
    """Abstract method to retrieve a cached result.

    Implemented by subclasses.

    Args:
        key: The cache key.

    Returns:
        The cached result or None if not found.
    """
    pass

set abstractmethod

set(key: str, value: CacheResult) -> None

Abstract method to set a cached result.

Implemented by subclasses.

PARAMETER DESCRIPTION
key

The cache key.

TYPE: str

value

The result to cache.

TYPE: CacheResult

Source code in src/pydvl/utils/caching/base.py
@abstractmethod
def set(self, key: str, value: CacheResult) -> None:
    """Abstract method to set a cached result.

    Implemented by subclasses.

    Args:
        key: The cache key.
        value: The result to cache.
    """
    pass

clear abstractmethod

clear() -> None

Abstract method to clear the entire cache.

Source code in src/pydvl/utils/caching/base.py
@abstractmethod
def clear(self) -> None:
    """Abstract method to clear the entire cache."""
    pass

combine_hashes abstractmethod

combine_hashes(*args: str) -> str

Abstract method to combine cache keys.

Source code in src/pydvl/utils/caching/base.py
@abstractmethod
def combine_hashes(self, *args: str) -> str:
    """Abstract method to combine cache keys."""
    pass

CachedFunc

CachedFunc(
    func: Callable[..., float],
    *,
    cache_backend: CacheBackend,
    config: Optional[CachedFuncConfig] = None,
)

Caches callable function results with a provided cache backend.

Wraps a callable function to cache its results using a provided an instance of a subclass of CacheBackend.

This class is heavily inspired from that of joblib.memory.MemorizedFunc.

This class caches calls to the wrapped callable by generating a hash key based on the wrapped callable's code, the arguments passed to it and the optional hash_prefix.

Warning

This class only works with hashable arguments to the wrapped callable.

PARAMETER DESCRIPTION
func

Callable to wrap.

TYPE: Callable[..., float]

cache_backend

Instance of CacheBackendBase that handles setting and getting values.

TYPE: CacheBackend

config

Configuration for wrapped function.

TYPE: Optional[CachedFuncConfig] DEFAULT: None

Source code in src/pydvl/utils/caching/base.py
def __init__(
    self,
    func: Callable[..., float],
    *,
    cache_backend: CacheBackend,
    config: Optional[CachedFuncConfig] = None,
) -> None:
    self.func = func
    self.cache_backend = cache_backend
    if config is None:
        config = CachedFuncConfig()
    self.config = config

    self.__doc__ = f"A wrapper around {func.__name__}() with caching enabled.\n" + (
        CachedFunc.__doc__ or ""
    )
    self.__name__ = f"cached_{func.__name__}"
    path = list(reversed(func.__qualname__.split(".")))
    patched = [f"cached_{path[0]}"] + path[1:]
    self.__qualname__ = ".".join(reversed(patched))

stats property

stats: CacheStats

Cache backend statistics.

__call__

__call__(*args, **kwargs) -> float

Call the wrapped cached function.

Executes the wrapped function, caching and returning the result.

Source code in src/pydvl/utils/caching/base.py
def __call__(self, *args, **kwargs) -> float:
    """Call the wrapped cached function.

    Executes the wrapped function, caching and returning the result.
    """
    return self._cached_call(args, kwargs)

CachedFuncConfig dataclass

CachedFuncConfig(
    hash_prefix: Optional[str] = None,
    ignore_args: Collection[str] = list(),
    time_threshold: float = 0.3,
    allow_repeated_evaluations: bool = False,
    rtol_stderr: float = 0.1,
    min_repetitions: int = 3,
)

Configuration for cached functions and methods, providing memoization of function calls.

Instances of this class are typically used as arguments for the construction of a Utility.

PARAMETER DESCRIPTION
hash_prefix

Optional string prefix that be prepended to the cache key. This can be provided in order to guarantee cache reuse across runs.

TYPE: Optional[str] DEFAULT: None

ignore_args

Do not take these keyword arguments into account when hashing the wrapped function for usage as key. This allows sharing the cache among different jobs for the same experiment run if the callable happens to have "nuisance" parameters like job_id which do not affect the result of the computation.

TYPE: Collection[str] DEFAULT: list()

time_threshold

Computations taking less time than this many seconds are not cached. A value of 0 means that it will always cache results.

TYPE: float DEFAULT: 0.3

allow_repeated_evaluations

If True, repeated calls to a function with the same arguments will be allowed and outputs averaged until the running standard deviation of the mean stabilizes below rtol_stderr * mean.

TYPE: bool DEFAULT: False

rtol_stderr

relative tolerance for repeated evaluations. More precisely, memcached() will stop evaluating the function once the standard deviation of the mean is smaller than rtol_stderr * mean.

TYPE: float DEFAULT: 0.1

min_repetitions

minimum number of times that a function evaluation on the same arguments is repeated before returning cached values. Useful for stochastic functions only. If the model training is very noisy, set this number to higher values to reduce variance.

TYPE: int DEFAULT: 3

DiskCacheBackend

DiskCacheBackend(cache_dir: Optional[Union[PathLike, str]] = None)

Bases: CacheBackend

Disk cache backend that stores results in files.

Implements the CacheBackend interface for a disk-based cache. Stores cache entries as pickled files on disk, keyed by cache key. This allows sharing evaluations across processes in a single node/computer.

PARAMETER DESCRIPTION
cache_dir

Base directory for cache storage.

TYPE: Optional[Union[PathLike, str]] DEFAULT: None

ATTRIBUTE DESCRIPTION
cache_dir

Base directory for cache storage.

Example

Basic usage:

>>> from pydvl.utils.caching.disk import DiskCacheBackend
>>> cache_backend = DiskCacheBackend()
>>> cache_backend.clear()
>>> value = 42
>>> cache_backend.set("key", value)
>>> cache_backend.get("key")
42

Callable wrapping:

>>> from pydvl.utils.caching.disk import DiskCacheBackend
>>> cache_backend = DiskCacheBackend()
>>> cache_backend.clear()
>>> value = 42
>>> def foo(x: int):
...     return x + 1
...
>>> wrapped_foo = cache_backend.wrap(foo)
>>> wrapped_foo(value)
43
>>> wrapped_foo.stats.misses
1
>>> wrapped_foo.stats.hits
0
>>> wrapped_foo(value)
43
>>> wrapped_foo.stats.misses
1
>>> wrapped_foo.stats.hits
1

PARAMETER DESCRIPTION
cache_dir

Base directory for cache storage. If not provided, this defaults to a newly created temporary directory.

TYPE: Optional[Union[PathLike, str]] DEFAULT: None

Source code in src/pydvl/utils/caching/disk.py
def __init__(
    self,
    cache_dir: Optional[Union[os.PathLike, str]] = None,
) -> None:
    """Initialize the disk cache backend.

    Args:
        cache_dir: Base directory for cache storage.
            If not provided, this defaults to a newly created
            temporary directory.
    """
    super().__init__()
    if cache_dir is None:
        cache_dir = tempfile.mkdtemp(prefix="pydvl")
    self.cache_dir = Path(cache_dir)
    self.cache_dir.mkdir(exist_ok=True, parents=True)

wrap

wrap(
    func: Callable, *, config: Optional[CachedFuncConfig] = None
) -> CachedFunc

Wraps a function to cache its results.

PARAMETER DESCRIPTION
func

The function to wrap.

TYPE: Callable

config

Optional caching options for the wrapped function.

TYPE: Optional[CachedFuncConfig] DEFAULT: None

RETURNS DESCRIPTION
CachedFunc

The wrapped cached function.

Source code in src/pydvl/utils/caching/base.py
def wrap(
    self,
    func: Callable,
    *,
    config: Optional[CachedFuncConfig] = None,
) -> "CachedFunc":
    """Wraps a function to cache its results.

    Args:
        func: The function to wrap.
        config: Optional caching options for the wrapped function.

    Returns:
        The wrapped cached function.
    """
    return CachedFunc(
        func,
        cache_backend=self,
        config=config,
    )

get

get(key: str) -> Optional[Any]

Get a value from the cache.

PARAMETER DESCRIPTION
key

Cache key.

TYPE: str

RETURNS DESCRIPTION
Optional[Any]

Cached value or None if not found.

Source code in src/pydvl/utils/caching/disk.py
def get(self, key: str) -> Optional[Any]:
    """Get a value from the cache.

    Args:
        key: Cache key.

    Returns:
        Cached value or None if not found.
    """
    cache_file = self.cache_dir / key
    if not cache_file.exists():
        self.stats.misses += 1
        return None
    self.stats.hits += 1
    with cache_file.open("rb") as f:
        return cloudpickle.load(f)

set

set(key: str, value: Any) -> None

Set a value in the cache.

PARAMETER DESCRIPTION
key

Cache key.

TYPE: str

value

Value to cache.

TYPE: Any

Source code in src/pydvl/utils/caching/disk.py
def set(self, key: str, value: Any) -> None:
    """Set a value in the cache.

    Args:
        key: Cache key.
        value: Value to cache.
    """
    cache_file = self.cache_dir / key
    self.stats.sets += 1
    with cache_file.open("wb") as f:
        cloudpickle.dump(value, f, protocol=PICKLE_VERSION)

clear

clear() -> None

Deletes cache directory and recreates it.

Source code in src/pydvl/utils/caching/disk.py
def clear(self) -> None:
    """Deletes cache directory and recreates it."""
    shutil.rmtree(self.cache_dir)
    self.cache_dir.mkdir(exist_ok=True, parents=True)

combine_hashes

combine_hashes(*args: str) -> str

Join cache key components.

Source code in src/pydvl/utils/caching/disk.py
def combine_hashes(self, *args: str) -> str:
    """Join cache key components."""
    return os.pathsep.join(args)

InMemoryCacheBackend

InMemoryCacheBackend()

Bases: CacheBackend

In-memory cache backend that stores results in a dictionary.

Implements the CacheBackend interface for an in-memory-based cache. Stores cache entries as values in a dictionary, keyed by cache key. This allows sharing evaluations across threads in a single process.

The implementation is not thread-safe.

ATTRIBUTE DESCRIPTION
cached_values

Dictionary used to store cached values.

TYPE: Dict[str, Any]

Example

Basic usage:

>>> from pydvl.utils.caching.memory import InMemoryCacheBackend
>>> cache_backend = InMemoryCacheBackend()
>>> cache_backend.clear()
>>> value = 42
>>> cache_backend.set("key", value)
>>> cache_backend.get("key")
42

Callable wrapping:

>>> from pydvl.utils.caching.memory import InMemoryCacheBackend
>>> cache_backend = InMemoryCacheBackend()
>>> cache_backend.clear()
>>> value = 42
>>> def foo(x: int):
...     return x + 1
...
>>> wrapped_foo = cache_backend.wrap(foo)
>>> wrapped_foo(value)
43
>>> wrapped_foo.stats.misses
1
>>> wrapped_foo.stats.hits
0
>>> wrapped_foo(value)
43
>>> wrapped_foo.stats.misses
1
>>> wrapped_foo.stats.hits
1

Source code in src/pydvl/utils/caching/memory.py
def __init__(self) -> None:
    """Initialize the in-memory cache backend."""
    super().__init__()
    self.cached_values: Dict[str, Any] = {}

wrap

wrap(
    func: Callable, *, config: Optional[CachedFuncConfig] = None
) -> CachedFunc

Wraps a function to cache its results.

PARAMETER DESCRIPTION
func

The function to wrap.

TYPE: Callable

config

Optional caching options for the wrapped function.

TYPE: Optional[CachedFuncConfig] DEFAULT: None

RETURNS DESCRIPTION
CachedFunc

The wrapped cached function.

Source code in src/pydvl/utils/caching/base.py
def wrap(
    self,
    func: Callable,
    *,
    config: Optional[CachedFuncConfig] = None,
) -> "CachedFunc":
    """Wraps a function to cache its results.

    Args:
        func: The function to wrap.
        config: Optional caching options for the wrapped function.

    Returns:
        The wrapped cached function.
    """
    return CachedFunc(
        func,
        cache_backend=self,
        config=config,
    )

get

get(key: str) -> Optional[Any]

Get a value from the cache.

PARAMETER DESCRIPTION
key

Cache key.

TYPE: str

RETURNS DESCRIPTION
Optional[Any]

Cached value or None if not found.

Source code in src/pydvl/utils/caching/memory.py
def get(self, key: str) -> Optional[Any]:
    """Get a value from the cache.

    Args:
        key: Cache key.

    Returns:
        Cached value or None if not found.
    """
    value = self.cached_values.get(key, None)
    if value is not None:
        self.stats.hits += 1
    else:
        self.stats.misses += 1
    return value

set

set(key: str, value: Any) -> None

Set a value in the cache.

PARAMETER DESCRIPTION
key

Cache key.

TYPE: str

value

Value to cache.

TYPE: Any

Source code in src/pydvl/utils/caching/memory.py
def set(self, key: str, value: Any) -> None:
    """Set a value in the cache.

    Args:
        key: Cache key.
        value: Value to cache.
    """
    self.cached_values[key] = value
    self.stats.sets += 1

clear

clear() -> None

Deletes cache dictionary and recreates it.

Source code in src/pydvl/utils/caching/memory.py
def clear(self) -> None:
    """Deletes cache dictionary and recreates it."""
    del self.cached_values
    self.cached_values = {}

combine_hashes

combine_hashes(*args: str) -> str

Join cache key components.

Source code in src/pydvl/utils/caching/memory.py
def combine_hashes(self, *args: str) -> str:
    """Join cache key components."""
    return os.pathsep.join(args)

MemcachedClientConfig dataclass

MemcachedClientConfig(
    server: Tuple[str, int] = ("localhost", 11211),
    connect_timeout: float = 1.0,
    timeout: float = 1.0,
    no_delay: bool = True,
    serde: PickleSerde = PickleSerde(pickle_version=PICKLE_VERSION),
)

Configuration of the memcached client.

PARAMETER DESCRIPTION
server

A tuple of (IP|domain name, port).

TYPE: Tuple[str, int] DEFAULT: ('localhost', 11211)

connect_timeout

How many seconds to wait before raising ConnectionRefusedError on failure to connect.

TYPE: float DEFAULT: 1.0

timeout

Duration in seconds to wait for send or recv calls on the socket connected to memcached.

TYPE: float DEFAULT: 1.0

no_delay

If True, set the TCP_NODELAY flag, which may help with performance in some cases.

TYPE: bool DEFAULT: True

serde

Serializer / Deserializer ("serde"). The default PickleSerde should work in most cases. See pymemcache.client.base.Client for details.

TYPE: PickleSerde DEFAULT: PickleSerde(pickle_version=PICKLE_VERSION)

MemcachedCacheBackend

MemcachedCacheBackend(config: MemcachedClientConfig = MemcachedClientConfig())

Bases: CacheBackend

Memcached cache backend for the distributed caching of functions.

Implements the CacheBackend interface for a memcached based cache. This allows sharing evaluations across processes and nodes in a cluster. You can run memcached as a service, locally or remotely, see the caching documentation.

PARAMETER DESCRIPTION
config

Memcached client configuration.

TYPE: MemcachedClientConfig DEFAULT: MemcachedClientConfig()

ATTRIBUTE DESCRIPTION
config

Memcached client configuration.

client

Memcached client instance.

Example

Basic usage:

>>> from pydvl.utils.caching.memcached import MemcachedCacheBackend
>>> cache_backend = MemcachedCacheBackend()
>>> cache_backend.clear()
>>> value = 42
>>> cache_backend.set("key", value)
>>> cache_backend.get("key")
42

Callable wrapping:

>>> from pydvl.utils.caching.memcached import MemcachedCacheBackend
>>> cache_backend = MemcachedCacheBackend()
>>> cache_backend.clear()
>>> value = 42
>>> def foo(x: int):
...     return x + 1
...
>>> wrapped_foo = cache_backend.wrap(foo)
>>> wrapped_foo(value)
43
>>> wrapped_foo.stats.misses
1
>>> wrapped_foo.stats.hits
0
>>> wrapped_foo(value)
43
>>> wrapped_foo.stats.misses
1
>>> wrapped_foo.stats.hits
1

PARAMETER DESCRIPTION
config

Memcached client configuration.

TYPE: MemcachedClientConfig DEFAULT: MemcachedClientConfig()

Source code in src/pydvl/utils/caching/memcached.py
def __init__(self, config: MemcachedClientConfig = MemcachedClientConfig()) -> None:
    """Initialize memcached cache backend.

    Args:
        config: Memcached client configuration.
    """

    super().__init__()
    self.config = config
    self.client = self._connect(self.config)

wrap

wrap(
    func: Callable, *, config: Optional[CachedFuncConfig] = None
) -> CachedFunc

Wraps a function to cache its results.

PARAMETER DESCRIPTION
func

The function to wrap.

TYPE: Callable

config

Optional caching options for the wrapped function.

TYPE: Optional[CachedFuncConfig] DEFAULT: None

RETURNS DESCRIPTION
CachedFunc

The wrapped cached function.

Source code in src/pydvl/utils/caching/base.py
def wrap(
    self,
    func: Callable,
    *,
    config: Optional[CachedFuncConfig] = None,
) -> "CachedFunc":
    """Wraps a function to cache its results.

    Args:
        func: The function to wrap.
        config: Optional caching options for the wrapped function.

    Returns:
        The wrapped cached function.
    """
    return CachedFunc(
        func,
        cache_backend=self,
        config=config,
    )

get

get(key: str) -> Optional[Any]

Get value from memcached.

PARAMETER DESCRIPTION
key

Cache key.

TYPE: str

RETURNS DESCRIPTION
Optional[Any]

Cached value or None if not found or client disconnected.

Source code in src/pydvl/utils/caching/memcached.py
def get(self, key: str) -> Optional[Any]:
    """Get value from memcached.

    Args:
        key: Cache key.

    Returns:
        Cached value or None if not found or client disconnected.
    """
    result = None
    try:
        result = self.client.get(key)
    except socket.timeout as e:
        self.stats.timeouts += 1
        warnings.warn(f"{type(self).__name__}: {str(e)}", RuntimeWarning)
    except OSError as e:
        self.stats.errors += 1
        warnings.warn(f"{type(self).__name__}: {str(e)}", RuntimeWarning)
    except AttributeError as e:
        # FIXME: this depends on _recv() failing on invalid sockets
        # See pymemcache.base.py,
        self.stats.reconnects += 1
        warnings.warn(f"{type(self).__name__}: {str(e)}", RuntimeWarning)
        self.client = self._connect(self.config)
    if result is None:
        self.stats.misses += 1
    else:
        self.stats.hits += 1
    return result

set

set(key: str, value: Any) -> None

Set value in memcached.

PARAMETER DESCRIPTION
key

Cache key.

TYPE: str

value

Value to cache.

TYPE: Any

Source code in src/pydvl/utils/caching/memcached.py
def set(self, key: str, value: Any) -> None:
    """Set value in memcached.

    Args:
        key: Cache key.
        value: Value to cache.
    """
    self.client.set(key, value, noreply=True)
    self.stats.sets += 1

clear

clear() -> None

Flush all values from memcached.

Source code in src/pydvl/utils/caching/memcached.py
def clear(self) -> None:
    """Flush all values from memcached."""
    self.client.flush_all(noreply=True)

combine_hashes

combine_hashes(*args: str) -> str

Join cache key components for Memcached.

Source code in src/pydvl/utils/caching/memcached.py
def combine_hashes(self, *args: str) -> str:
    """Join cache key components for Memcached."""
    return ":".join(args)

__getstate__

__getstate__() -> Dict

Enables pickling after a socket has been opened to the memcached server, by removing the client from the stored data.

Source code in src/pydvl/utils/caching/memcached.py
def __getstate__(self) -> Dict:
    """Enables pickling after a socket has been opened to the
    memcached server, by removing the client from the stored
    data."""
    odict = self.__dict__.copy()
    del odict["client"]
    return odict

__setstate__

__setstate__(d: Dict)

Restores a client connection after loading from a pickle.

Source code in src/pydvl/utils/caching/memcached.py
def __setstate__(self, d: Dict):
    """Restores a client connection after loading from a pickle."""
    self.config = d["config"]
    self.stats = d["stats"]
    self.client = self._connect(self.config)