-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
Currently reading from or writing to disk blocks the event loop
distributed/distributed/worker.py
Lines 1948 to 1964 in 74e8dc6
| def put_key_in_memory(self, ts, value, transition=True): | |
| if ts.key in self.data: | |
| ts.state = "memory" | |
| return | |
| if ts.key in self.actors: | |
| self.actors[ts.key] = value | |
| else: | |
| start = time() | |
| self.data[ts.key] = value | |
| ts.state = "memory" | |
| stop = time() | |
| if stop - start > 0.020: | |
| ts.startstops.append( | |
| {"action": "disk-write", "start": start, "stop": stop} | |
| ) |
This can cause workers to become unresponsive, especially in systems with very slow disk access. Ideally Disk I/O would happen concurrently. There are a couple of ways to do this.
Offload to separate thread
We could move all manipulation of the Worker.data MutableMapping to a separate thread, such as we do with the offload function, which we use today for deserialization.
However, if we do this then we need to do it for all access to Worker.data including seemingly innocuous checks like if key in self.data which may become annoying.
Handle Disk logic directly in the worker
We could also break apart the MutableMapping abstraction, and unpack the zict logic directly into the Worker code. This would allow us to keep a lot of the fast access in the event loop, while treating disk access specially. It would also open the door for more performance improvements, like trying to schedule tasks for data that is currently in memory rather than data that is currently on disk. In general if we want to improve out-of-memory handling in Dask we'll eventually need to break this abstraction.
However, breaking this abstraction comes at considerable cost. First, it means that there is more to manage in a monolithic Worker codebase (zict has tricky logic that we haven't really had to touch or maintain in years). Second, it means that we'll have to find a way that still lets other groups like RAPIDS extend the storage hierarchy (they have device->host->disk rather than just host->disk).