-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
In some workloads with highly compressible data we would like to trade off some computation time for more in-memory storage automatically. Dask workers store data in a MutableMapping (the superclass of dict). So in principle all we would need to do is make a MutableMapping subclass that overrides the getitem and setitem methods to compress and decompress data on demand.
This would be an interesting task for someone who wants to help Dask, wants to learn some internals, but doesn't know a lot just yet. I'm marking this as a good first issue. This is an interesting and useful task that doesn't require deep incidental Dask knowledge.
Here is a conceptual prototype of such a MutableMapping. This is completely untested, but maybe gives a sense of how I think about this problem. It's probably not ideal though so I would encourage others to come up with their own design.
import collections
from typing import Dict, Tuple, Callable
class TypeCompression(collections.MutableMapping):
def __init__(
self,
types: Dict[type, Tuple[Callable, Callable]],
storage=dict
):
self.types = type
self.storage = collections.defaultdict(storage)
def __setitem__(self, key, value):
typ = type(key)
if typ in self.types:
compress, decompress = self.types[typ]
value = compress(value)
self.storage[typ] = value
def __getitem__(self, key):
for typ, d in self.storage.items():
if key in d:
value = d[key]
break
else:
raise KeyError(key)
if typ in self.types:
compress, decompress = self.types[typ]
value = decompress(value)
return valueThis came up in #3624 . cc @madsbk and @jakirkham from that PR. cc also @eric-czech who was maybe curious about automatic compression/decompression.
People looking at compression might want to look at and use Dask's serializations and comrpession machinery in distributed.protocol (maybe start by looking at the dumps, serialize and maybe_compress functions).