|
2 | 2 |
|
3 | 3 | import logging |
4 | 4 | import time |
5 | | -from collections.abc import Mapping |
| 5 | +from collections.abc import Mapping, Sized |
6 | 6 | from contextlib import contextmanager |
7 | 7 | from functools import partial |
8 | | -from typing import Any, Literal, NamedTuple |
| 8 | +from typing import Any, Literal, NamedTuple, Protocol |
9 | 9 |
|
10 | 10 | from packaging.version import parse as parse_version |
11 | 11 |
|
@@ -33,6 +33,36 @@ def __sub__(self, other: SpilledSize) -> SpilledSize: # type: ignore |
33 | 33 | return SpilledSize(self.memory - other.memory, self.disk - other.disk) |
34 | 34 |
|
35 | 35 |
|
| 36 | +class ManualEvictProto(Protocol): |
| 37 | + """Duck-type API that a third-party alternative to SpillBuffer must respect (in |
| 38 | + addition to MutableMapping) if it wishes to support spilling when the |
| 39 | + ``distributed.worker.memory.spill`` threshold is surpassed. |
| 40 | +
|
| 41 | + This is public API. At the moment of writing, Dask-CUDA implements this protocol in |
| 42 | + the ProxifyHostFile class. |
| 43 | + """ |
| 44 | + |
| 45 | + @property |
| 46 | + def fast(self) -> Sized | bool: |
| 47 | + """Access to fast memory. This is normally a MutableMapping, but for the purpose |
| 48 | + of the manual eviction API it is just tested for emptiness to know if there is |
| 49 | + anything to evict. |
| 50 | + """ |
| 51 | + ... # pragma: nocover |
| 52 | + |
| 53 | + def evict(self) -> int: |
| 54 | + """Manually evict a key/value pair from fast to slow memory. |
| 55 | + Return size of the evicted value in fast memory. |
| 56 | +
|
| 57 | + If the eviction failed for whatever reason, return -1. This method must |
| 58 | + guarantee that the key/value pair that caused the issue has been retained in |
| 59 | + fast memory and that the problem has been logged internally. |
| 60 | +
|
| 61 | + This method never raises. |
| 62 | + """ |
| 63 | + ... # pragma: nocover |
| 64 | + |
| 65 | + |
36 | 66 | class SpillBuffer(zict.Buffer): |
37 | 67 | """MutableMapping that automatically spills out dask key/value pairs to disk when |
38 | 68 | the total size of the stored data exceeds the target. If max_spill is provided the |
@@ -163,11 +193,14 @@ def __setitem__(self, key: str, value: Any) -> None: |
163 | 193 | assert key not in self.slow |
164 | 194 |
|
165 | 195 | def evict(self) -> int: |
166 | | - """Manually evict the oldest key/value pair, even if target has not been reached. |
167 | | - Returns sizeof(value). |
| 196 | + """Implementation of :meth:`ManualEvictProto.evict`. |
| 197 | +
|
| 198 | + Manually evict the oldest key/value pair, even if target has not been |
| 199 | + reached. Returns sizeof(value). |
168 | 200 | If the eviction failed (value failed to pickle, disk full, or max_spill |
169 | 201 | exceeded), return -1; the key/value pair that caused the issue will remain in |
170 | | - fast. This method never raises. |
| 202 | + fast. The exception has been logged internally. |
| 203 | + This method never raises. |
171 | 204 | """ |
172 | 205 | try: |
173 | 206 | with self.handle_errors(None): |
|
0 commit comments