-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
There is a race condition where, after Scheduler.remove_worker completes and the worker has been removed from the scheduler's perspective, comms with the worker are still open. So:
- Subsequent heartbeats from the worker cause the scheduler to treat it as a new worker connecting
- Any messages might send will still be processed, and could cause errors (if handlers expect the worker to exist, and it doesn't).
Basically remove_worker results in partially-removed state (WorkerState and other objects are removed, but connections are not).
Full trace-through
Copied from #6263 (comment)
-
close_workercallsremove_worker. (It also redundantly enqueues a close message to the worker—remove_workeris about to do the same—though I don't think this makes a difference here.) -
remove_workerdoes a bunch of stuff, including deleting theBatchedSendto that worker which has the{op: close}message queued on it. It does not wait until the BatchedSend queue is flushed. -
remove_workerremoves theWorkerStateentry for that worker -
At this point, the scheduler has removed all knowledge of the worker. However, the
closemessage hasn't even been sent to it yet—it's still queued inside a BatchedSend, which may not send the message for up to 5ms more. And even after the message has been sent, thenWorker.closestill has to get scheduled on the worker event loop and run (which could take a while Worker event loop performance hampered by GIL/convoy effect #6325). There are multiple sources of truth for when a worker is gone. In some places, it's whetheraddr in Scheduler.workers, oraddr in Scheduler.stream_comms. In others, it's whether a comm to that worker is closed. The entry being removed from the dict and the comm being closed are disjoint events.Thus, between when
Scheduler.remove_workerends and all comms to the worker comm actually close, we are in a degenerate state and exposed to race conditions. The scheduler forgets about the worker before closing communications to that worker. Or even confirming that the worker has received the message to close. Explicitly flushing and closing theBatchedSendwould alleviate this, though not resolve it: while waiting for our send side to close, we could still receive messages from the now-removed worker.Simply moving the flush-and-close to the top of
Scheduler.remove_worker—before anything else happens—I think would fix the problem. Then, we wouldn't be removing state related to the worker until we were guaranteed the worker connection was closed. -
After
remove_workerhas run, but before the close message actually gets sent over the BatchedSend /Worker.closestarts running, the worker send another heartbeat to the scheduler. -
From the scheduler's perspective, this worker doesn't exist, so it replies "missing" to the worker.
-
Though the worker is in state
closing_gracefully, it still tries to re-register.- Fun fact (not relevant to this, but yet another broken thing in BatchedSend land): this is going to call
Worker.batched_stream.startwith a new comm object, even thoughWorker.batched_streamis already running with the previous comm object. This the double-start bug I pointed out in https://github.com/dask/distributed/pull/6272/files#r866082750. This will swap out thecommthat theBatchedSendis using, and launch a secondbackground_sendcoroutine. Surprisingly, I think having multiple background_sends racing to process one BatchedSend is still safe, just silly. - The
BatchedSendhas now lost its reference to the original comm. However, oneWorker.handle_scheduleris still running with that original comm, and now one is running with the new comm too. Since there's still a reference to the old comm, it isn't closed. - Because the worker's old comm wasn't closed, the old
Scheduler.handle_workeris still running. Even though, from the scheduler's perspective, the worker it's supposed to be handling doesn't exist anymore. Yay [Discussion] Structured concurrency #6201 — if we had a handle on this coroutine, we could have cancelled it inremove_worker.
- Fun fact (not relevant to this, but yet another broken thing in BatchedSend land): this is going to call
-
The scheduler handles this "new" worker connecting. (This passes through the buggy code of Scheduler worker reconnect drops messages #6341, though I don't think that actually makes a difference in this case.) This is where all of the "Unexpected worker completed task" messages come from, followed by another "Register worker" and "Starting worker compute stream".
-
Eventually,
Worker.closeactually closes (one of) its batched comms to the scheduler. -
This triggers a second
remove_workeron the scheduler, removing the entry for the "new" worker.- There's probably still a
Scheduler.handle_workerrunning for the old comm too. I presume it eventually stops when the worker actually shuts down and severs the connection? When it does, it probably won't runremove_workeranother time, because the address is already gone fromScheduler.stream_comms.
- There's probably still a
Even if we do #6350 (which will resolve the heartbeat-reconnect issue), we still need to ensure that remove_worker closes the connection to the worker before removing other state.
I think the easiest way to do that is simply to await self.stream_comms[address].close() at the top of remove_worker, before removing other state. There are still some race conditions with concurrent remove_worker calls to work out, though.