Skip to content

Worker may message or reconnect to scheduler while worker is closing #6354

@gjoseph92

Description

@gjoseph92

There is a race condition where, after Scheduler.remove_worker completes and the worker has been removed from the scheduler's perspective, comms with the worker are still open. So:

  • Subsequent heartbeats from the worker cause the scheduler to treat it as a new worker connecting
  • Any messages might send will still be processed, and could cause errors (if handlers expect the worker to exist, and it doesn't).

Basically remove_worker results in partially-removed state (WorkerState and other objects are removed, but connections are not).

Full trace-through

Copied from #6263 (comment)

  1. close_worker calls remove_worker. (It also redundantly enqueues a close message to the workerremove_worker is about to do the same—though I don't think this makes a difference here.)

  2. remove_worker does a bunch of stuff, including deleting the BatchedSend to that worker which has the {op: close} message queued on it. It does not wait until the BatchedSend queue is flushed.

  3. remove_worker removes the WorkerState entry for that worker

  4. At this point, the scheduler has removed all knowledge of the worker. However, the close message hasn't even been sent to it yet—it's still queued inside a BatchedSend, which may not send the message for up to 5ms more. And even after the message has been sent, then Worker.close still has to get scheduled on the worker event loop and run (which could take a while Worker event loop performance hampered by GIL/convoy effect #6325). There are multiple sources of truth for when a worker is gone. In some places, it's whether addr in Scheduler.workers, or addr in Scheduler.stream_comms. In others, it's whether a comm to that worker is closed. The entry being removed from the dict and the comm being closed are disjoint events.

    Thus, between when Scheduler.remove_worker ends and all comms to the worker comm actually close, we are in a degenerate state and exposed to race conditions. The scheduler forgets about the worker before closing communications to that worker. Or even confirming that the worker has received the message to close. Explicitly flushing and closing the BatchedSend would alleviate this, though not resolve it: while waiting for our send side to close, we could still receive messages from the now-removed worker.

    Simply moving the flush-and-close to the top of Scheduler.remove_worker—before anything else happens—I think would fix the problem. Then, we wouldn't be removing state related to the worker until we were guaranteed the worker connection was closed.

  5. After remove_worker has run, but before the close message actually gets sent over the BatchedSend / Worker.close starts running, the worker send another heartbeat to the scheduler.

  6. From the scheduler's perspective, this worker doesn't exist, so it replies "missing" to the worker.

  7. Though the worker is in state closing_gracefully, it still tries to re-register.

    1. Fun fact (not relevant to this, but yet another broken thing in BatchedSend land): this is going to call Worker.batched_stream.start with a new comm object, even though Worker.batched_stream is already running with the previous comm object. This the double-start bug I pointed out in https://github.com/dask/distributed/pull/6272/files#r866082750. This will swap out the comm that the BatchedSend is using, and launch a second background_send coroutine. Surprisingly, I think having multiple background_sends racing to process one BatchedSend is still safe, just silly.
    2. The BatchedSend has now lost its reference to the original comm. However, one Worker.handle_scheduler is still running with that original comm, and now one is running with the new comm too. Since there's still a reference to the old comm, it isn't closed.
    3. Because the worker's old comm wasn't closed, the old Scheduler.handle_worker is still running. Even though, from the scheduler's perspective, the worker it's supposed to be handling doesn't exist anymore. Yay [Discussion] Structured concurrency #6201 — if we had a handle on this coroutine, we could have cancelled it in remove_worker.
  8. The scheduler handles this "new" worker connecting. (This passes through the buggy code of Scheduler worker reconnect drops messages #6341, though I don't think that actually makes a difference in this case.) This is where all of the "Unexpected worker completed task" messages come from, followed by another "Register worker" and "Starting worker compute stream".

  9. Eventually, Worker.close actually closes (one of) its batched comms to the scheduler.

  10. This triggers a second remove_worker on the scheduler, removing the entry for the "new" worker.

    1. There's probably still a Scheduler.handle_worker running for the old comm too. I presume it eventually stops when the worker actually shuts down and severs the connection? When it does, it probably won't run remove_worker another time, because the address is already gone from Scheduler.stream_comms.

Even if we do #6350 (which will resolve the heartbeat-reconnect issue), we still need to ensure that remove_worker closes the connection to the worker before removing other state.

I think the easiest way to do that is simply to await self.stream_comms[address].close() at the top of remove_worker, before removing other state. There are still some race conditions with concurrent remove_worker calls to work out, though.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is brokenstabilityIssue or feature related to cluster stability (e.g. deadlock)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions