Skip to content

Deserialise data on demand #5900

@crusaderky

Description

@crusaderky

Issue

As of today, data is eagerly deserialised as soon as it is un-spilled and as soon as it is received from another worker.
This has the benefit that we know that all data in Worker.data.fast is already deserialised when we need it and that Worker.data can be treated as an opaque MutableMapping.
However, this design simplicity comes at a very steep cost:

  • When data is retrieved from the spill disk just for the purpose of sending it from worker A to worker B, then there's an unnecessary unpickle->pickle loop on worker A
  • After a rebalance(), data is simply parked on a worker and there is a good chance it won't be needed on the same worker anytime soon. It's just as likely to be used by a task on the worker than it is to end up spilled or moved somewhere else again. This last point is mitigated by the fact that rebalance() uses a least recently inserted policy and a dead zone threshold around the cluster mean memory usage, meaning it won't pinball the same keys around.
  • Soon, the Active Memory Manager will rebalance every few seconds.

Proposed design

Whenever a Worker acquires data in any way other than task completion, meaning

  • over the network, through get_data
  • over the network, because of the AMM
  • over the network, because of the current implementation of rebalance(), replicate(), or scatter(broadcast=True) (soon to be reimplemented on top of the AMM)
  • from the disk, through unspilling

it should not unpickle the data immediately. Instead, it should keep it pickled. Worker.data.fast will contain a mix of pickled and unpickled data.
Whenever a task compute() starts, it will receive a mix of pickled and unpickled inputs. It will unpickle the pickled inputs and put them back into data.fast, replacing the pickled version.

Or if you prefer: the current two mappings data.slow and data.fast should be replaced by data.slow, data.fast_pickled and data.fast_unpickled; compute() will always read and write from data.fast_unpickled, which will internally read and delete from the other two; whereas network receive and send will always read and write from data.fast_pickled which will internally read and delete on the other mappings. This also carries the benefit that all pickle/unpickle work, be it due to network activity or spilling/unspilling, can now be encapsulated in a single module.

Challenges

  • Care should be taken for when two tasks start at the same time on a multithreaded worker and require the same input; we should avoid unpickling twice.
  • We should figure out a way to isolate and display in the GUI serialisation/deserialisation time.
  • Care should be taken for the edge case where sizeof(data) > memory_limit * target, which triggers immediate spilling to disk. If the data was acquired for the purpose of computing a task, this would likely mean going to an unnecessary spill->unspill cycle.

Metadata

Metadata

Assignees

No one assigned

    Labels

    discussionDiscussing a topic with no specific actions yetfeatureSomething is missingmemory

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions