-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
- Continues from Asynchronous Disk Access in Workers #4424
- Related: Create MutableMapping for automatic compression #3656
- Related: Thoughts on additional spilling layers #4629
- Related: Encapsulate spill buffer and memory_monitor #5891
Issue
As of today, data is eagerly deserialised as soon as it is un-spilled and as soon as it is received from another worker.
This has the benefit that we know that all data in Worker.data.fast is already deserialised when we need it and that Worker.data can be treated as an opaque MutableMapping.
However, this design simplicity comes at a very steep cost:
- When data is retrieved from the spill disk just for the purpose of sending it from worker A to worker B, then there's an unnecessary unpickle->pickle loop on worker A
- After a rebalance(), data is simply parked on a worker and there is a good chance it won't be needed on the same worker anytime soon. It's just as likely to be used by a task on the worker than it is to end up spilled or moved somewhere else again. This last point is mitigated by the fact that rebalance() uses a least recently inserted policy and a dead zone threshold around the cluster mean memory usage, meaning it won't pinball the same keys around.
- Soon, the Active Memory Manager will rebalance every few seconds.
Proposed design
Whenever a Worker acquires data in any way other than task completion, meaning
- over the network, through get_data
- over the network, because of the AMM
- over the network, because of the current implementation of
rebalance(),replicate(), orscatter(broadcast=True)(soon to be reimplemented on top of the AMM) - from the disk, through unspilling
it should not unpickle the data immediately. Instead, it should keep it pickled. Worker.data.fast will contain a mix of pickled and unpickled data.
Whenever a task compute() starts, it will receive a mix of pickled and unpickled inputs. It will unpickle the pickled inputs and put them back into data.fast, replacing the pickled version.
Or if you prefer: the current two mappings data.slow and data.fast should be replaced by data.slow, data.fast_pickled and data.fast_unpickled; compute() will always read and write from data.fast_unpickled, which will internally read and delete from the other two; whereas network receive and send will always read and write from data.fast_pickled which will internally read and delete on the other mappings. This also carries the benefit that all pickle/unpickle work, be it due to network activity or spilling/unspilling, can now be encapsulated in a single module.
Challenges
- Care should be taken for when two tasks start at the same time on a multithreaded worker and require the same input; we should avoid unpickling twice.
- We should figure out a way to isolate and display in the GUI serialisation/deserialisation time.
- Care should be taken for the edge case where sizeof(data) > memory_limit * target, which triggers immediate spilling to disk. If the data was acquired for the purpose of computing a task, this would likely mean going to an unnecessary spill->unspill cycle.